Commit 574a3ba4 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Restructure the client to support different operation modes

parent 2cb4b749
......@@ -34,7 +34,7 @@ func (c config) Log(formatString string, params ...interface{}) {
}
func (c consumer) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{"[Conn]"}, params...)...)
log.Printf("%s "+formatString, append([]interface{}{"[AMQP]"}, params...)...)
}
func (c *config) consumer() (cons *consumer) {
......@@ -106,7 +106,7 @@ func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
// all the bindings from our queue to the topic exchange
for _, exchange := range c.exchanges {
if *backendDebugging {
log.Printf("[Conn] Binding to exchange %s with routing keys %v", exchange.name, exchange.bindingKeys)
log.Printf("[AMQP] Binding to exchange %s with routing keys %v", exchange.name, exchange.bindingKeys)
}
for _, bindingKey := range exchange.bindingKeys {
if err = c.channel.QueueBind(
......@@ -134,7 +134,7 @@ func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
return
}
log.Printf("[Conn] Connected to broker")
log.Printf("[AMQP] Connected to broker")
return
}
......@@ -145,15 +145,15 @@ func (c *consumer) reconnect() {
connectionError := <-c.closed
if connectionError.Reason != "reconnect failed" {
log.Printf("[Conn] Connection closed: %s", connectionError)
log.Printf("[AMQP] Connection closed: %s", connectionError)
}
log.Printf("[Conn] Reconnecting in %s", c.reconnectTimeout)
log.Printf("[AMQP] Reconnecting in %s", c.reconnectTimeout)
time.Sleep(c.reconnectTimeout)
log.Printf("[Conn] Reconnecting")
log.Printf("[AMQP] Reconnecting")
if deliveries, err := c.connect(); err != nil {
log.Printf("[Conn] Error reconnecting: %s", err)
log.Printf("[AMQP] Error reconnecting: %s", err)
c.closed = make(chan *amqp.Error)
......@@ -178,7 +178,7 @@ func (c *consumer) reconnect() {
func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect()
if err != nil {
log.Printf("[Conn] Error connecting: %s", err)
log.Printf("[AMQP] Error connecting: %s", err)
return
}
......@@ -195,9 +195,9 @@ func (c *config) consume(deliveries <-chan amqp.Delivery) {
if *backendDebugging {
indented := new(bytes.Buffer)
if err := json.Indent(indented, d.Body, " ", " "); err != nil {
log.Printf("Received via AMQP (unable to indent: %v):\n%s", err, d.Body)
log.Printf("[AMQP] Received (unable to indent: %v):\n%s", err, d.Body)
} else {
log.Printf("Received via AMQP:\n%s", indented)
log.Printf("[AMQP] Received:\n%s", indented)
}
}
......
......@@ -43,17 +43,17 @@ type (
// ReconnectTimeout gets parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"`
NewTasks chan task
DoneTasks chan taskReply
FetchInterval time.Duration
ReconnectTimeout time.Duration
RabbitMQConfig rabbitMQConfig
Site string
NewTasks chan task
DoneTasks chan taskReply
FetchInterval time.Duration
ReconnectTimeout time.Duration
RabbitMQConfig rabbitMQConfig
Site string
}
// strippedConfig is sent to the backend on startup
strippedConfig struct {
GroupToServices map[string]([]service) `json:"group_to_services"`
GroupToServices map[string]([]service) `json:"group_to_services"`
EntitlementToServices map[string]([]service) `json:"entitlement_to_services"`
}
)
......@@ -71,7 +71,7 @@ var (
).Author(
"Lukas Burgey",
).Version(
"0.1.0",
"0.4.0",
)
configFile = app.Arg("config", "Config file to file to use.").Required().String()
scriptDebugging = app.Flag("debug-scripts", "Display debugging info concerning executed scripts").Bool()
......@@ -99,7 +99,7 @@ func (c *config) syncConfig() (err error) {
// we inform the backend which services we provide
strippedConfigBytes, err = json.Marshal(strippedConfig{
GroupToServices: c.GroupToServices,
GroupToServices: c.GroupToServices,
EntitlementToServices: c.EntitlementToServices,
})
if err != nil {
......@@ -184,7 +184,7 @@ func getConfig(configFile string) (c config, err error) {
c.ReconnectTimeout, parseError = time.ParseDuration(c.ReconnectTimeoutString)
if parseError != nil {
log.Printf("[Conf] Error parsing 'reconnect_timeout': %s", parseError)
c.ReconnectTimeout = defaultReconnectTimeout
c.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default 'reconnect_timeout' of %v", c.ReconnectTimeout)
}
......@@ -205,7 +205,7 @@ func getConfig(configFile string) (c config, err error) {
}
}
c.Log("Groups: %s", c.GroupToServices)
c.Log("Groups: %s", c.GroupToServices)
c.Log("Entitlements: %s", c.EntitlementToServices)
// initialize the task queues
......
......@@ -196,7 +196,7 @@ func (c *config) taskResponder() {
// IMPLEMENTATIONS
func (c *config) fetchTasks() (err error) {
if len(c.EntitlementToServices) == 0 && len(c.GroupToServices) == 0 {
log.Printf("[Fetch] Not fetching because the are no services to fetch")
log.Printf("[HTTP] Not fetching because the are no services to fetch")
return
}
......@@ -227,7 +227,7 @@ func (c *config) fetchTasks() (err error) {
defer resp.Body.Close()
if resp.StatusCode != 200 {
err = fmt.Errorf("Unable to fetch tasks (response: %v)", resp.Status)
err = fmt.Errorf("[HTTP] Unable to fetch tasks (response: %v)", resp.Status)
return
}
......@@ -236,23 +236,23 @@ func (c *config) fetchTasks() (err error) {
err = json.Unmarshal(body, &newTasks)
if err != nil {
err = fmt.Errorf("Error unmarshaling: %s\n%s", err, body)
err = fmt.Errorf("[HTTP] Error unmarshaling: %s\n%s", err, body)
return
}
if *backendDebugging {
indented := new(bytes.Buffer)
if err := json.Indent(indented, body, "", " "); err != nil {
log.Printf("Fetched via HTTP (unable to indent: %v):\n%s", err, body)
log.Printf("[HTTP] Fetched (unable to indent: %v):\n%s", err, body)
} else {
log.Printf("Fetched via HTTP:\n%s", indented)
log.Printf("[HTTP] Fetched:\n%s", indented)
}
}
for _, task := range newTasks {
err = c.scheduleTask(task)
if err != nil {
log.Printf("[scheduleTask] %v", err)
log.Printf("[HTTP] Scheduling: %v", err)
// we execute all tasks even when one fails
err = nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment