Commit dc3fa107 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Fix a bug in the handler

parent 3aac9a1d
......@@ -240,6 +240,7 @@ func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
}
// report a closing connection and reconnect
// FIXME we have to check if the config changed (maybe after rest fetches?)
func (c *consumer) reconnect() {
// wait till the connection is closed
......@@ -293,27 +294,23 @@ func (c *config) handleInitialUpdates(iu update) {
}
func (c *config) handleTask(ti task) (err error) {
if err != nil {
log.Printf("[U] %s:%s %s %s ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
return
}
ok, err := c.ackTask(ti)
if ok {
log.Printf("[U] %s:%s %s %s ACK", ti.Service, ti.User, ti.Action, ti.Key.Name)
log.Printf("[Task] %s:%s %s %s ACK", ti.Service, ti.User, ti.Action, ti.Key.Name)
} else {
log.Printf("[U] %s:%s %s %s ACK-ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
log.Printf("[Task] %s:%s %s %s ACK-ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
}
return
}
// handles either the tasks from rest and from rabbitmq
func (c *config) taskHandler() {
log.Printf("[T] Task handler started")
for newTask := range c.NewTasks {
// handle tasks asynchronously
go func(t task) {
if err := c.handleTask(t); err != nil {
log.Printf("[T] Error handling task: %s", err)
log.Printf("[Task] Error handling task: %s", err)
}
}(newTask)
}
......@@ -347,9 +344,9 @@ func (c *config) ackTask(ti task) (ok bool, err error) {
return
}
func (c *config) fetchTasks() (tasks []task, err error) {
func (c *config) fetchTasks() (err error) {
if len(c.Services) == 0 {
log.Printf("[U] Not fetching initial updates because the are no services to fetch")
log.Printf("[Fetch] Not fetching because the are no services to fetch")
return
}
......@@ -379,12 +376,12 @@ func (c *config) fetchTasks() (tasks []task, err error) {
var u update
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &u)
log.Printf("[U] %d new task items", len(u.Tasks))
log.Printf("[Fetch] %d new tasks", len(u.Tasks))
if err != nil {
log.Printf("[U] error fetching update: %s", err)
log.Printf("[Fetch] error fetching: %s", err)
}
for _, task := range tasks {
for _, task := range u.Tasks {
// put the in the task queue
c.NewTasks <- task
}
......@@ -484,6 +481,10 @@ func main() {
log.Fatalf("[Err] No valid config. Exiting")
}
if len(c.Services) < 1 {
log.Fatalf("[Err] No services. Exiting")
}
// initialize the task queue
c.NewTasks = make(chan task)
go c.taskHandler()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment