Commit bfc1a5b0 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Fix the task fetch

parent 954696d2
...@@ -43,12 +43,14 @@ func (c *config) consumer() (cons *consumer) { ...@@ -43,12 +43,14 @@ func (c *config) consumer() (cons *consumer) {
exchange{ exchange{
"services", "topic", serviceRoutingKeys, "services", "topic", serviceRoutingKeys,
}, },
exchange{
"sites", "topic", []string{c.Site},
},
exchange{ exchange{
"groups", "topic", groupRoutingKeys, "groups", "topic", groupRoutingKeys,
}, },
/*
exchange{
"sites", "topic", []string{c.Site},
},
*/
}, },
consumer: c.consume, consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout, reconnectTimeout: c.ReconnectTimeout,
...@@ -199,8 +201,8 @@ func (c *config) consume(deliveries <-chan amqp.Delivery) { ...@@ -199,8 +201,8 @@ func (c *config) consume(deliveries <-chan amqp.Delivery) {
newTask.RoutingKey = d.RoutingKey newTask.RoutingKey = d.RoutingKey
// enqueue the task // enqueue the task
log.Printf("[Task] %v: RECEIVED", newTask) log.Printf("[Task] %v: RECEIVED via %v-%v", newTask, d.Exchange, d.RoutingKey)
c.NewTasks <- newTask c.scheduleTask(newTask)
d.Ack( d.Ack(
false, // multiple false, // multiple
) )
......
...@@ -2,13 +2,10 @@ package main ...@@ -2,13 +2,10 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
"os/signal"
"syscall"
"time" "time"
"gopkg.in/alecthomas/kingpin.v2" "gopkg.in/alecthomas/kingpin.v2"
...@@ -37,7 +34,7 @@ type ( ...@@ -37,7 +34,7 @@ type (
// GroupToServices maps a group name to services provided for this group // GroupToServices maps a group name to services provided for this group
// this is for deployment per _group_ // this is for deployment per _group_
GroupToServices map[string]([]service) `json:"group_to_services"` // GroupToServices map[string]([]service) `json:"group_to_services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
...@@ -60,7 +57,9 @@ var ( ...@@ -60,7 +57,9 @@ var (
).Version( ).Version(
"0.3.0", "0.3.0",
) )
configFile = app.Arg("config", "Config file to file to use.").Required().String() configFile = app.Arg("config", "Config file to file to use.").Required().String()
scriptDebugging = app.Flag("debug-scripts", "Display debugging info concerning executed scripts").Bool()
backendDebugging = app.Flag("debug-backend", "Display debugging info concerning the backend").Bool()
) )
func logError(err error, msg string) { func logError(err error, msg string) {
...@@ -183,55 +182,9 @@ func getConfig(configFile string) (c config, err error) { ...@@ -183,55 +182,9 @@ func getConfig(configFile string) (c config, err error) {
return return
} }
func signalHandler() {
exits := make(chan int)
signals := make(chan os.Signal, 1)
go func(exits chan int) {
for e := range exits {
os.Exit(e)
}
}(exits)
signal.Notify(signals,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
for signal := range signals {
switch signal {
case syscall.SIGHUP:
fmt.Println("hungup")
exits <- 0
// kill -SIGINT XXXX or Ctrl+c
case syscall.SIGINT:
fmt.Println("Warikomi")
exits <- 0
// kill -SIGTERM XXXX
case syscall.SIGTERM:
fmt.Println("force stop")
exits <- 0
// kill -SIGQUIT XXXX
case syscall.SIGQUIT:
fmt.Println("stop and core dump")
exits <- 1
default:
fmt.Println("Unknown signal.")
}
}
}
func main() { func main() {
var err error var err error
go signalHandler()
// get arguments // get arguments
kingpin.MustParse(app.Parse(os.Args[1:])) kingpin.MustParse(app.Parse(os.Args[1:]))
......
...@@ -21,29 +21,39 @@ type ( ...@@ -21,29 +21,39 @@ type (
Description string `json:"description"` Description string `json:"description"`
} }
group struct {
Name string `json:"name"`
}
task struct { task struct {
ID int `json:"id"` ID int `json:"id"`
StateTarget string `json:"state_target"` StateTarget string `json:"state_target"`
User scripts.User `json:"user"` User scripts.User `json:"user"`
Key scripts.SSHKey `json:"key"` Key scripts.SSHKey `json:"key"`
Questionnaire map[string]string `json:"questionnaire"` Questionnaire map[string]string `json:"questionnaire"`
Group group `json:"group"`
// maybe overwritten by scheduleTask
Service service `json:"service,omitempty"`
// Exchange and RoutingKey are inserted when we receive the task via RabbitMQ
Exchange string Exchange string
RoutingKey string RoutingKey string
} }
taskExecution struct { taskExecution struct {
// ID of the according task // ID of the according task (at the backend: DeploymentState)
ID int `json:"id"` ID int `json:"id"`
Output scripts.Output `json:"output"` Output scripts.Output `json:"output"`
// we need to inform the backend for which service we deployed // we need to inform the backend for which service we deployed
Service service `json:"service"` Service service `json:"service"`
} }
)
var ( // ackResponse response of the backend for our response
scriptDebugging = true ackResponse struct {
Error string `json:"error"`
}
) )
func (s service) String() string { func (s service) String() string {
...@@ -55,6 +65,18 @@ func (t task) String() string { ...@@ -55,6 +65,18 @@ func (t task) String() string {
} }
func (c *config) taskServices(t task) (services []service, err error) { func (c *config) taskServices(t task) (services []service, err error) {
// hacky: we need to determine exchange and routing key when we fetched manually from the rest
// interface
if t.Exchange == "" || t.RoutingKey == "" {
if t.Service != (service{}) {
t.Exchange = "services"
t.RoutingKey = t.Service.Name
} else if t.Group != (group{}) {
t.Exchange = "groups"
t.RoutingKey = t.Group.Name
}
}
switch t.Exchange { switch t.Exchange {
case "services": case "services":
for _, svc := range c.Services { for _, svc := range c.Services {
...@@ -63,7 +85,20 @@ func (c *config) taskServices(t task) (services []service, err error) { ...@@ -63,7 +85,20 @@ func (c *config) taskServices(t task) (services []service, err error) {
return return
} }
} }
return
// try the group based services
// TODO is this desired?
if len(services) == 0 {
for _, groupServices := range c.GroupToServices {
for _, groupService := range groupServices {
if groupService.Name == t.RoutingKey {
services = []service{groupService}
return
}
}
}
}
case "groups": case "groups":
var ok bool var ok bool
services, ok = c.GroupToServices[t.RoutingKey] services, ok = c.GroupToServices[t.RoutingKey]
...@@ -76,10 +111,13 @@ func (c *config) taskServices(t task) (services []service, err error) { ...@@ -76,10 +111,13 @@ func (c *config) taskServices(t task) (services []service, err error) {
return return
} }
return
case "sites": case "sites":
// TODO implement // TODO implement
} }
if len(services) == 0 {
log.Printf("Unable to determine services for task %s", t)
}
return return
} }
...@@ -109,19 +147,21 @@ func (c *config) taskHandler() { ...@@ -109,19 +147,21 @@ func (c *config) taskHandler() {
go func(t task) { go func(t task) {
if err := c.handleTask(t); err != nil { if err := c.handleTask(t); err != nil {
log.Printf("[Task] Error handling task: %s", err) log.Printf("[Task] Error handling task: %s", err)
/* c.DoneTasks <- taskExecution{
TODO ID: t.ID,
go func(t task) { Output: scripts.Output{
time.Sleep(1 * time.Minute) Status: scripts.Failed,
c.NewTasks <- t // TODO maybe another message for the user
}(t) Msg: fmt.Sprint(err),
*/ },
Service: t.Service,
}
} }
}(newTask) }(newTask)
} }
} }
// acks tasks in c.DoneTasks // responds to the backend concering tasks in c.DoneTasks
func (c *config) taskResponder() { func (c *config) taskResponder() {
for doneTask := range c.DoneTasks { for doneTask := range c.DoneTasks {
// ack tasks asynchronously // ack tasks asynchronously
...@@ -131,7 +171,6 @@ func (c *config) taskResponder() { ...@@ -131,7 +171,6 @@ func (c *config) taskResponder() {
log.Printf("[Task] %v: ACK-ERROR: %s", te.ID, err) log.Printf("[Task] %v: ACK-ERROR: %s", te.ID, err)
// reschedule failed responses // reschedule failed responses
// TODO does this work?
go func(te taskExecution) { go func(te taskExecution) {
time.Sleep(time.Minute) time.Sleep(time.Minute)
c.DoneTasks <- te c.DoneTasks <- te
...@@ -184,14 +223,38 @@ func (c *config) fetchTasks() (err error) { ...@@ -184,14 +223,38 @@ func (c *config) fetchTasks() (err error) {
return return
} }
log.Printf("[Fetch] %d new tasks", len(newTasks)) log.Printf("[Fetch] %d new tasks:%v", len(newTasks), newTasks)
for _, task := range newTasks { for _, task := range newTasks {
c.scheduleTask(task)
}
return
}
// scheduleTask puts tasks in the newTasks channel
func (c *config) scheduleTask(ti task) (err error) {
// determine the services of the task
var services []service
services, err = c.taskServices(ti)
if err != nil {
return
}
// schedule one task per service of the task
for _, svc := range services {
var t task
t = ti
t.Service = svc
log.Printf("[Task:%v] scheduling for %s", t.ID, svc.Name)
// put the in the task queue // put the in the task queue
c.NewTasks <- task c.NewTasks <- t
} }
return return
} }
func (c *config) handleTask(ti task) (err error) { func (c *config) handleTask(ti task) (err error) {
var output scripts.Output var output scripts.Output
// encode input as json // encode input as json
...@@ -206,81 +269,77 @@ func (c *config) handleTask(ti task) (err error) { ...@@ -206,81 +269,77 @@ func (c *config) handleTask(ti task) (err error) {
return return
} }
// determine the services of the task // TODO execute in parallel
var services []service var (
services, err = c.taskServices(ti) stdin io.WriteCloser
if err != nil { stdout, stderr io.ReadCloser
return )
}
for _, svc := range services {
// TODO execute in parallel
var (
stdin io.WriteCloser
stdout, stderr io.ReadCloser
)
commandName := svc.Command commandName := ti.Service.Command
// execute the script // execute the script
if *scriptDebugging {
log.Printf("[Task:%v] Executing: '%s'", ti.ID, commandName) log.Printf("[Task:%v] Executing: '%s'", ti.ID, commandName)
if scriptDebugging { log.Printf("[Task:%v] Input: %s", ti.ID, input)
log.Printf("[Task:%v] Input: %s", ti.ID, input) }
}
cmd := exec.Command(commandName) cmd := exec.Command(commandName)
stdin, err = cmd.StdinPipe() stdin, err = cmd.StdinPipe()
if err != nil { if err != nil {
return return
} }
stdout, err = cmd.StdoutPipe() stdout, err = cmd.StdoutPipe()
if err != nil { if err != nil {
return return
} }
stderr, err = cmd.StderrPipe() stderr, err = cmd.StderrPipe()
if err != nil { if err != nil {
return return
} }
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return return
} }
stdin.Write(iBytes) stdin.Write(iBytes)
stdin.Close() stdin.Close()
// decode json output // decode json output
var outputBytes, logOutputBytes []byte var outputBytes, logOutputBytes []byte
outputBytes, err = ioutil.ReadAll(stdout) outputBytes, err = ioutil.ReadAll(stdout)
if err != nil { if err != nil {
return return
} }
logOutputBytes, err = ioutil.ReadAll(stderr) logOutputBytes, err = ioutil.ReadAll(stderr)
if err != nil { if err != nil {
return return
} }
if *scriptDebugging {
log.Printf("[Task:%v] Logs:\n%s", ti.ID, logOutputBytes) log.Printf("[Task:%v] Logs:\n%s", ti.ID, logOutputBytes)
log.Printf("[Task:%v] End of Logs", ti.ID) log.Printf("[Task:%v] End of Logs", ti.ID)
}
err = cmd.Wait() err = cmd.Wait()
if err != nil { if err != nil {
return return
} }
err = json.Unmarshal(outputBytes, &output) err = json.Unmarshal(outputBytes, &output)
if err != nil { if err != nil {
return return
} }
if *scriptDebugging {
log.Printf("[Task:%v] Output: %s", ti.ID, output) log.Printf("[Task:%v] Output: %s", ti.ID, output)
}
c.DoneTasks <- taskExecution{ c.DoneTasks <- taskExecution{
ID: ti.ID, ID: ti.ID,
Output: output, Output: output,
Service: svc, Service: ti.Service,
}
} }
return return
} }
func (c *config) respondToTask(te taskExecution) (err error) { func (c *config) respondToTask(te taskExecution) (err error) {
taskResponse, err := json.MarshalIndent(te, "", " ") taskResponse, err := json.MarshalIndent(te, "", " ")
if err != nil { if err != nil {
...@@ -303,16 +362,23 @@ func (c *config) respondToTask(te taskExecution) (err error) { ...@@ -303,16 +362,23 @@ func (c *config) respondToTask(te taskExecution) (err error) {
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode == 200 { if resp.StatusCode == 200 {
log.Printf("[Task:%v] Successful ACK", te.ID) log.Printf("[Task:%v:%v] Successful response", te.ID, te.Service.Name)
} else { } else {
var ackResponse []byte log.Printf("[Task:%v:%v] Response Status: %v", te.ID, te.Service.Name, resp.StatusCode)
ackResponse, err = ioutil.ReadAll(resp.Body)
var (
ackResponseBytes []byte
ackResponse ackResponse
)
ackResponseBytes, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
err = json.Unmarshal(ackResponseBytes, &ackResponse)
if err != nil { if err != nil {
log.Printf("[Task:%v] ACK Status: %v", te.ID, resp.StatusCode)
return return
} }
log.Printf("[Tsak:%v] ACK Response: %s", te.ID, ackResponse) log.Printf("[Task:%v:%v] Response to our response: '%s'", te.ID, te.Service.Name, ackResponse.Error)
log.Printf("[Task:%v] ACK Status: %v", te.ID, resp.StatusCode)
} }
return return
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment