Commit d6488182 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Change the logging of tasks

parent b5584c93
......@@ -9,6 +9,7 @@ import (
"log"
"net/http"
"os/exec"
"strings"
"time"
s "git.scc.kit.edu/fum/feudalScripts"
......@@ -61,7 +62,23 @@ func (s service) String() string {
}
func (t task) String() string {
return fmt.Sprintf("%s-%s:%s %s", t.Exchange, t.RoutingKey, t.User, t.StateTarget)
return fmt.Sprintf("[Task:%v:%v]", t.ID, t.Service.Name)
}
func (t task) Log(formatString string, params ...interface{}) {
formatString = "%s " + formatString
params = append([]interface{}{t}, params...)
log.Printf(formatString, params...)
}
func (te taskExecution) String() string {
return fmt.Sprintf("[Task:%v:%v]", te.ID, te.Service.Name)
}
func (te taskExecution) Log(formatString string, params ...interface{}) {
formatString = "%s " + formatString
params = append([]interface{}{te}, params...)
log.Printf(formatString, params...)
}
func (c *config) taskServices(t task) (services []service, err error) {
......@@ -116,7 +133,7 @@ func (c *config) taskServices(t task) (services []service, err error) {
}
if len(services) == 0 {
log.Printf("Unable to determine services for task %s", t)
t.Log("Unable to determine services for this task")
}
return
}
......@@ -146,15 +163,16 @@ func (c *config) taskHandler() {
// handle tasks asynchronously
go func(t task) {
if err := c.handleTask(t); err != nil {
log.Printf("[Task] Error handling task: %s", err)
t.Log("Error handling task: %s", err)
c.DoneTasks <- taskExecution{
ID: t.ID,
ID: t.ID,
Service: t.Service,
Output: s.Output{
State: s.Failed,
// TODO maybe another message for the user
Msg: fmt.Sprint(err),
},
Service: t.Service,
}
}
}(newTask)
......@@ -168,7 +186,7 @@ func (c *config) taskResponder() {
go func(te taskExecution) {
err := c.respondToTask(te)
if err != nil {
log.Printf("[Task] %v: ACK-ERROR: %s", te.ID, err)
te.Log("ACK-ERROR: %s", err)
// reschedule failed responses
go func(te taskExecution) {
......@@ -223,46 +241,49 @@ func (c *config) fetchTasks() (err error) {
return
}
log.Printf("[Fetch] %d new tasks:%v", len(newTasks), newTasks)
log.Printf("[Fetch] %d new tasks: %v", len(newTasks), newTasks)
for _, task := range newTasks {
c.scheduleTask(task)
err = c.scheduleTask(task)
if err != nil {
return
}
}
return
}
// scheduleTask puts tasks in the newTasks channel
func (c *config) scheduleTask(ti task) (err error) {
func (c *config) scheduleTask(t task) (err error) {
// determine the services of the task
var services []service
services, err = c.taskServices(ti)
services, err = c.taskServices(t)
if err != nil {
return
}
// schedule one task per service of the task
for _, svc := range services {
var t task
t = ti
t.Service = svc
var serviceTask task
serviceTask = t
serviceTask.Service = svc
log.Printf("[Task:%v] scheduling for %s", t.ID, svc.Name)
serviceTask.Log("Scheduling execution for %s", svc.Name)
// put the in the task queue
c.NewTasks <- t
c.NewTasks <- serviceTask
}
return
}
func (c *config) handleTask(ti task) (err error) {
func (c *config) handleTask(t task) (err error) {
var output s.Output
// encode input as json
input := s.Input{
StateTarget: ti.StateTarget,
User: ti.User,
Key: ti.Key,
Questionnaire: ti.Questionnaire,
StateTarget: t.StateTarget,
User: t.User,
Key: t.Key,
Questionnaire: t.Questionnaire,
}
iBytes, err := input.Marshal()
if err != nil {
......@@ -275,14 +296,14 @@ func (c *config) handleTask(ti task) (err error) {
stdout, stderr io.ReadCloser
)
commandName := ti.Service.Command
// execute the script
if *scriptDebugging {
log.Printf("[Task:%v] Executing: '%s'", ti.ID, commandName)
log.Printf("[Task:%v] Input: %s", ti.ID, input)
t.Log("Executing: '%s'", t.Service.Command)
t.Log("Input: %s", input)
}
cmd := exec.Command(commandName)
commandParts := strings.Split(t.Service.Command, " ")
cmd := exec.Command(commandParts[0], commandParts[1:]...)
stdin, err = cmd.StdinPipe()
if err != nil {
return
......@@ -315,8 +336,10 @@ func (c *config) handleTask(ti task) (err error) {
return
}
if *scriptDebugging {
log.Printf("[Task:%v] Logs:\n%s", ti.ID, logOutputBytes)
log.Printf("[Task:%v] End of Logs", ti.ID)
if len(logOutputBytes) > 0 {
t.Log("Logs:\n%s", logOutputBytes)
t.Log("End of Logs")
}
}
err = cmd.Wait()
......@@ -326,16 +349,19 @@ func (c *config) handleTask(ti task) (err error) {
err = json.Unmarshal(outputBytes, &output)
if err != nil {
if len(outputBytes) > 0 {
t.Log("Raw output: %s", outputBytes)
}
return
}
if *scriptDebugging {
log.Printf("[Task:%v] Output: %s", ti.ID, output)
t.Log("Output: %s", output)
}
c.DoneTasks <- taskExecution{
ID: ti.ID,
ID: t.ID,
Service: t.Service,
Output: output,
Service: ti.Service,
}
return
}
......@@ -362,9 +388,9 @@ func (c *config) respondToTask(te taskExecution) (err error) {
defer resp.Body.Close()
if resp.StatusCode == 200 {
log.Printf("[Task:%v:%v] Successful response", te.ID, te.Service.Name)
te.Log("Successful response")
} else {
log.Printf("[Task:%v:%v] Response Status: %v", te.ID, te.Service.Name, resp.StatusCode)
te.Log("Response Status: %v", resp.StatusCode)
var (
ackResponseBytes []byte
......@@ -378,7 +404,7 @@ func (c *config) respondToTask(te taskExecution) (err error) {
if err != nil {
return
}
log.Printf("[Task:%v:%v] Response to our response: '%s'", te.ID, te.Service.Name, ackResponse.Error)
te.Log("Response to our response: '%s'", ackResponse.Error)
}
return
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment