Commit 954696d2 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Adapt to changes at the backend

parent 04b93779
......@@ -29,20 +29,26 @@ type (
)
func (c *config) consumer() (cons *consumer) {
var keys []string
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
var serviceRoutingKeys, groupRoutingKeys []string
for _, service := range c.Services {
serviceRoutingKeys = append(serviceRoutingKeys, fmt.Sprintf("service.%s", service.Name))
}
for groupName := range c.GroupToServices {
groupRoutingKeys = append(groupRoutingKeys, fmt.Sprintf("%s", groupName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchanges: []exchange{
exchange{
"deployments", "topic", keys,
"services", "topic", serviceRoutingKeys,
},
exchange{
"sites", "topic", []string{c.Site},
},
exchange{
"groups", "topic", groupRoutingKeys,
},
},
consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout,
......@@ -176,13 +182,6 @@ func (c *consumer) startConsuming() (err error) {
func (c *config) consume(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
// TODO remove: debug output for demonstration
/*
buf := new(bytes.Buffer)
json.Indent(buf, d.Body, "", " ")
log.Printf("DEMONSTRATION MESSAGE: %s", buf)
*/
var newTask task
err := json.Unmarshal(d.Body, &newTask)
......@@ -195,6 +194,10 @@ func (c *config) consume(deliveries <-chan amqp.Delivery) {
false, // signal the server to requeue the message
)
} else {
// extend the task with additional information
newTask.Exchange = d.Exchange
newTask.RoutingKey = d.RoutingKey
// enqueue the task
log.Printf("[Task] %v: RECEIVED", newTask)
c.NewTasks <- newTask
......
......@@ -27,12 +27,20 @@ type (
}
config struct {
Host string `json:"host"`
Username string `json:"username"`
Password string `json:"password"`
Services map[string]service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
Host string `json:"host"`
Username string `json:"username"`
Password string `json:"password"`
// Services provided at the site of this client
// this is for deployment per _service_
Services []service `json:"services"`
// GroupToServices maps a group name to services provided for this group
// this is for deployment per _group_
GroupToServices map[string]([]service) `json:"group_to_services"` //
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
DoneTasks chan taskExecution
FetchInterval time.Duration
......@@ -50,7 +58,7 @@ var (
).Author(
"Lukas Burgey",
).Version(
"0.2.0",
"0.3.0",
)
configFile = app.Arg("config", "Config file to file to use.").Required().String()
)
......@@ -99,16 +107,17 @@ func (c *config) fetchConfig() (err error) {
}
log.Printf("[Conf] Services: %s", c.Services)
log.Printf("[Conf] Groups: %s", c.GroupToServices)
return
}
func filterPermitted(permitted []service, wanted map[string]service) (remainder map[string]service) {
remainder = make(map[string]service, len(wanted))
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
serviceLoop:
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
remainder[service.Name] = service
remainder = append(remainder, service)
continue serviceLoop
}
}
......@@ -232,7 +241,7 @@ func main() {
log.Fatalf("[Err] No valid config. Exiting")
}
if len(c.Services) < 1 {
if len(c.Services) == 0 && len(c.GroupToServices) == 0 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
......
......@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
......@@ -15,23 +16,29 @@ import (
type (
service struct {
Name string `json:"name"`
Command string `json:"command"`
Name string `json:"name"`
Command string `json:"command"`
Description string `json:"description"`
}
task struct {
ID int `json:"id"`
StateTarget string `json:"state_target"`
Service service `json:"service"`
User scripts.User `json:"user"`
Key scripts.SSHKey `json:"key"`
Questionnaire map[string]string `json:"questionnaire"`
Exchange string
RoutingKey string
}
taskExecution struct {
// ID of the according task
ID int `json:"id"`
Output scripts.Output `json:"output"`
// we need to inform the backend for which service we deployed
Service service `json:"service"`
}
)
......@@ -44,7 +51,36 @@ func (s service) String() string {
}
func (t task) String() string {
return fmt.Sprintf("%s:%s %8s %s", t.Service, t.User, t.StateTarget, t.Key)
return fmt.Sprintf("%s-%s:%s %s", t.Exchange, t.RoutingKey, t.User, t.StateTarget)
}
func (c *config) taskServices(t task) (services []service, err error) {
switch t.Exchange {
case "services":
for _, svc := range c.Services {
if svc.Name == t.RoutingKey {
services = []service{svc}
return
}
}
return
case "groups":
var ok bool
services, ok = c.GroupToServices[t.RoutingKey]
if !ok {
err = fmt.Errorf(
"Group %s has no mapped services! Unable to get service for task %s",
t.RoutingKey,
t,
)
return
}
return
case "sites":
// TODO implement
}
return
}
// fetches tasks (which rabbitmq missed) manually
......@@ -108,7 +144,7 @@ func (c *config) taskResponder() {
// IMPLEMENTATIONS
func (c *config) fetchTasks() (err error) {
if len(c.Services) == 0 {
if len(c.Services) == 0 && len(c.GroupToServices) == 0 {
log.Printf("[Fetch] Not fetching because the are no services to fetch")
return
}
......@@ -125,6 +161,9 @@ func (c *config) fetchTasks() (err error) {
for _, service := range c.Services {
q.Add("s", service.Name)
}
for groupName := range c.GroupToServices {
q.Add("g", groupName)
}
req.URL.RawQuery = q.Encode()
req.SetBasicAuth(c.Username, c.Password)
......@@ -167,59 +206,79 @@ func (c *config) handleTask(ti task) (err error) {
return
}
// execute the script
commandName := c.Services[ti.Service.Name].Command
log.Printf("[Task:%v] Executing: %s", ti.ID, commandName)
if scriptDebugging {
log.Printf("[Task:%v] Input: %s", ti.ID, input)
}
cmd := exec.Command(commandName)
stdin, err := cmd.StdinPipe()
if err != nil {
return
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return
}
stderr, err := cmd.StderrPipe()
// determine the services of the task
var services []service
services, err = c.taskServices(ti)
if err != nil {
return
}
err = cmd.Start()
if err != nil {
return
}
stdin.Write(iBytes)
stdin.Close()
for _, svc := range services {
// TODO execute in parallel
var (
stdin io.WriteCloser
stdout, stderr io.ReadCloser
)
commandName := svc.Command
// execute the script
log.Printf("[Task:%v] Executing: '%s'", ti.ID, commandName)
if scriptDebugging {
log.Printf("[Task:%v] Input: %s", ti.ID, input)
}
// decode json output
outputBytes, err := ioutil.ReadAll(stdout)
if err != nil {
return
}
cmd := exec.Command(commandName)
stdin, err = cmd.StdinPipe()
if err != nil {
return
}
stdout, err = cmd.StdoutPipe()
if err != nil {
return
}
stderr, err = cmd.StderrPipe()
if err != nil {
return
}
logOutputBytes, err := ioutil.ReadAll(stderr)
if err != nil {
return
}
log.Printf("[Task:%v] Logs:\n%s", ti.ID, logOutputBytes)
log.Printf("[Task:%v] End of Logs", ti.ID)
err = cmd.Start()
if err != nil {
return
}
stdin.Write(iBytes)
stdin.Close()
err = cmd.Wait()
if err != nil {
return
}
// decode json output
var outputBytes, logOutputBytes []byte
outputBytes, err = ioutil.ReadAll(stdout)
if err != nil {
return
}
err = json.Unmarshal(outputBytes, &output)
if err != nil {
return
}
log.Printf("[Task:%v] Output: %s", ti.ID, output)
logOutputBytes, err = ioutil.ReadAll(stderr)
if err != nil {
return
}
log.Printf("[Task:%v] Logs:\n%s", ti.ID, logOutputBytes)
log.Printf("[Task:%v] End of Logs", ti.ID)
err = cmd.Wait()
if err != nil {
return
}
err = json.Unmarshal(outputBytes, &output)
if err != nil {
return
}
log.Printf("[Task:%v] Output: %s", ti.ID, output)
c.DoneTasks <- taskExecution{ti.ID, output}
c.DoneTasks <- taskExecution{
ID: ti.ID,
Output: output,
Service: svc,
}
}
return
}
func (c *config) respondToTask(te taskExecution) (err error) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment