Commit a7b294cc authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Add a separate handler for acks

parent dc3fa107
...@@ -23,6 +23,7 @@ type ( ...@@ -23,6 +23,7 @@ type (
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task NewTasks chan task
DoneTasks chan task
FetchInterval time.Duration FetchInterval time.Duration
ReconnectTimeout time.Duration ReconnectTimeout time.Duration
} }
...@@ -104,6 +105,10 @@ func (k sshKey) String() string { ...@@ -104,6 +105,10 @@ func (k sshKey) String() string {
return k.Name return k.Name
} }
func (t task) String() string {
return fmt.Sprintf("%s:%s %s %s", t.Service, t.User, t.Action, t.Key)
}
func filterPermitted(permitted []service, wanted []service) (remainder []service) { func filterPermitted(permitted []service, wanted []service) (remainder []service) {
serviceLoop: serviceLoop:
for _, service := range wanted { for _, service := range wanted {
...@@ -160,6 +165,10 @@ func readConfig(configFile string) (c config, err error) { ...@@ -160,6 +165,10 @@ func readConfig(configFile string) (c config, err error) {
} }
log.Printf("[Conf] Reconnect timeout set to %s", c.ReconnectTimeout) log.Printf("[Conf] Reconnect timeout set to %s", c.ReconnectTimeout)
// initialize the task queues
c.NewTasks = make(chan task)
c.DoneTasks = make(chan task)
return return
} }
...@@ -295,12 +304,9 @@ func (c *config) handleInitialUpdates(iu update) { ...@@ -295,12 +304,9 @@ func (c *config) handleInitialUpdates(iu update) {
func (c *config) handleTask(ti task) (err error) { func (c *config) handleTask(ti task) (err error) {
ok, err := c.ackTask(ti) time.Sleep(time.Second)
if ok { log.Printf("[Task] %s: DONE", ti)
log.Printf("[Task] %s:%s %s %s ACK", ti.Service, ti.User, ti.Action, ti.Key.Name) c.DoneTasks <- ti
} else {
log.Printf("[Task] %s:%s %s %s ACK-ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
}
return return
} }
...@@ -316,6 +322,25 @@ func (c *config) taskHandler() { ...@@ -316,6 +322,25 @@ func (c *config) taskHandler() {
} }
} }
// acks all done tasks
func (c *config) taskAcker() {
for doneTask := range c.DoneTasks {
// handle tasks asynchronously
go func(t task) {
ok, err := c.ackTask(t);
if err != nil {
log.Printf("[Task] %s: ACK-ERROR: %s", t, err)
return
}
if !ok {
log.Printf("[Task] %s: ACK-FAILED", t)
return
}
log.Printf("[Task] %s: ACK", t)
}(doneTask)
}
}
func (c *config) ackTask(ti task) (ok bool, err error) { func (c *config) ackTask(ti task) (ok bool, err error) {
url := fmt.Sprintf( url := fmt.Sprintf(
"https://%s/backend/clientapi/ack/%d/", "https://%s/backend/clientapi/ack/%d/",
...@@ -435,6 +460,7 @@ func (c *config) handler(deliveries <-chan amqp.Delivery) { ...@@ -435,6 +460,7 @@ func (c *config) handler(deliveries <-chan amqp.Delivery) {
) )
} else { } else {
// enqueue the task // enqueue the task
log.Printf("[Task] %s: RECEIVED", newTask)
c.NewTasks <- newTask c.NewTasks <- newTask
d.Ack( d.Ack(
false, // multiple false, // multiple
...@@ -485,9 +511,9 @@ func main() { ...@@ -485,9 +511,9 @@ func main() {
log.Fatalf("[Err] No services. Exiting") log.Fatalf("[Err] No services. Exiting")
} }
// initialize the task queue // start task handler and acker
c.NewTasks = make(chan task)
go c.taskHandler() go c.taskHandler()
go c.taskAcker()
// starting ticker // starting ticker
ticker := time.NewTicker(c.FetchInterval) ticker := time.NewTicker(c.FetchInterval)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment