Commit 45174c33 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Add sequential execution flag

parent d6488182
......@@ -28,6 +28,14 @@ type (
}
)
func (c config) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{"[Conf]"}, params...)...)
}
func (c consumer) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{"[Conn]"}, params...)...)
}
func (c *config) consumer() (cons *consumer) {
var serviceRoutingKeys, groupRoutingKeys []string
for _, service := range c.Services {
......@@ -46,11 +54,6 @@ func (c *config) consumer() (cons *consumer) {
exchange{
"groups", "topic", groupRoutingKeys,
},
/*
exchange{
"sites", "topic", []string{c.Site},
},
*/
},
consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout,
......@@ -203,7 +206,7 @@ func (c *config) consume(deliveries <-chan amqp.Delivery) {
newTask.RoutingKey = d.RoutingKey
// enqueue the task
log.Printf("[Task] %v: RECEIVED via %v-%v", newTask, d.Exchange, d.RoutingKey)
newTask.Log("RECEIVED via %v-%v", d.Exchange, d.RoutingKey)
c.scheduleTask(newTask)
d.Ack(
false, // multiple
......
......@@ -64,11 +64,12 @@ var (
).Version(
"0.3.0",
)
configFile = app.Arg("config", "Config file to file to use.").Required().String()
scriptDebugging = app.Flag("debug-scripts", "Display debugging info concerning executed scripts").Bool()
backendDebugging = app.Flag("debug-backend", "Display debugging info concerning the backend").Bool()
brokerDebugging = app.Flag("debug-broker", "Display debugging info concerning the message broker").Bool()
debugAll = app.Flag("debug", "Display all debugging info").Bool()
configFile = app.Arg("config", "Config file to file to use.").Required().String()
scriptDebugging = app.Flag("debug-scripts", "Display debugging info concerning executed scripts").Bool()
backendDebugging = app.Flag("debug-backend", "Display debugging info concerning the backend").Bool()
brokerDebugging = app.Flag("debug-broker", "Display debugging info concerning the message broker").Bool()
debugAll = app.Flag("debug", "Display all debugging info").Bool()
sequentialExecution = app.Flag("seq", "Execute tasks sequentially").Bool()
)
func logError(err error, msg string) {
......@@ -138,7 +139,7 @@ func (c *config) syncConfig() (err error) {
}
*/
log.Printf("[Conf] Synchronised with backend")
c.Log("Synchronised with backend")
return
}
......@@ -159,17 +160,17 @@ serviceLoop:
func getConfig(configFile string) (c config, err error) {
log.Printf("[Conf] Reading config file %s", configFile)
c.Log("Reading config file %s", configFile)
bs, err := ioutil.ReadFile(configFile)
if err != nil {
log.Printf("[Conf] Error reading config file: %s", err)
c.Log("Error reading config file: %s", err)
return
}
err = json.Unmarshal(bs, &c)
if err != nil {
log.Printf("[Conf] Error parsing config file: %s", err)
c.Log("Error parsing config file: %s", err)
return
}
......@@ -207,8 +208,8 @@ func getConfig(configFile string) (c config, err error) {
return
}
log.Printf("[Conf] Services: %s", c.Services)
log.Printf("[Conf] Groups: %s", c.GroupToServices)
c.Log("Services: %s", c.Services)
c.Log("Groups: %s", c.GroupToServices)
// initialize the task queues
c.NewTasks = make(chan task)
......@@ -238,6 +239,9 @@ func main() {
if *backendDebugging {
log.Printf("[Debug] backend debugging enabled")
}
if *sequentialExecution {
log.Printf("[Debug] Executing tasks sequentially")
}
// read the config file
c, err := getConfig(*configFile)
......
......@@ -12,7 +12,7 @@ import (
"strings"
"time"
s "git.scc.kit.edu/fum/feudalScripts"
s "git.scc.kit.edu/feudal/feudalScripts"
)
type (
......@@ -66,9 +66,7 @@ func (t task) String() string {
}
func (t task) Log(formatString string, params ...interface{}) {
formatString = "%s " + formatString
params = append([]interface{}{t}, params...)
log.Printf(formatString, params...)
log.Printf("%s "+formatString, append([]interface{}{t}, params...)...)
}
func (te taskExecution) String() string {
......@@ -76,9 +74,7 @@ func (te taskExecution) String() string {
}
func (te taskExecution) Log(formatString string, params ...interface{}) {
formatString = "%s " + formatString
params = append([]interface{}{te}, params...)
log.Printf(formatString, params...)
log.Printf("%s "+formatString, append([]interface{}{te}, params...)...)
}
func (c *config) taskServices(t task) (services []service, err error) {
......@@ -160,8 +156,7 @@ func (c *config) taskFetcher() {
// handles tasks in c.NewTasks
func (c *config) taskHandler() {
for newTask := range c.NewTasks {
// handle tasks asynchronously
go func(t task) {
handleWrapper := func(t task) {
if err := c.handleTask(t); err != nil {
t.Log("Error handling task: %s", err)
......@@ -175,7 +170,14 @@ func (c *config) taskHandler() {
},
}
}
}(newTask)
}
if *sequentialExecution {
handleWrapper(newTask)
} else {
// handle tasks asynchronously
go handleWrapper(newTask)
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment