Commit 412c0ef3 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Adapt to wide api changes at the backend

parent 0b4cc67f
......@@ -36,23 +36,26 @@ type (
Groups []group `json:"groups"`
}
deploymentUpdate struct {
User user `json:"user"`
Service service `json:"service"`
SSHKeys []sshKey `json:"ssh_keys"`
SSHKeysToWithdraw []sshKey `json:"ssh_keys_to_withdraw"`
service struct {
Name string `json:"name"`
DeployCommand string `json:"deploy"`
WithdrawCommand string `json:"withdraw"`
}
service struct {
Name string `json:"name"`
task struct {
ID int `json:"id"`
Action string `json:"action"`
User user `json:"user"`
Service service `json:"service"`
Key sshKey `json:"key"`
}
serviceUpdate struct {
initialUpdate struct {
Tasks []task `json:"tasks"`
}
// used for initial updates
deploymentUpdates struct {
Services map[string]([]deploymentUpdate) `json:"services"`
response struct {
OK bool `json:"ok"`
}
)
......@@ -64,9 +67,9 @@ var (
client = &http.Client{}
)
func failOnError(err error, msg string) {
func logError(err error, msg string) {
if err != nil {
log.Fatalf(" [E] %s: %s", msg, err)
log.Printf(" [E] %s: %s", msg, err)
}
}
......@@ -74,16 +77,16 @@ func (s service) String() string {
return s.Name
}
func (k sshKey) String() string {
return k.Name
func (u user) String() string {
return u.Email
}
func (du deploymentUpdate) String() string {
return fmt.Sprintf("%s: %s: deploy%s withdraw%s", du.Service, du.User.Email, du.SSHKeys, du.SSHKeysToWithdraw)
func (k sshKey) String() string {
return k.Name
}
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
serviceLoop:
serviceLoop:
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
......@@ -102,7 +105,7 @@ func filterPermitted(permitted []service, wanted []service) (remainder []service
func readConfig(configFile string) (c config, err error) {
bs, err := ioutil.ReadFile(configFile)
err = json.Unmarshal(bs, &c)
failOnError(err, "Reading config file")
logError(err, "Reading config file")
permittedServices := c.fetchConfiguredServices()
log.Printf(" [C] Permitted services: %s", permittedServices)
......@@ -119,27 +122,77 @@ func readConfig(configFile string) (c config, err error) {
return
}
func readDeploymentUpdate(body []byte) (update deploymentUpdate) {
func readInitialUpdate(body []byte) (update initialUpdate) {
err := json.Unmarshal(body, &update)
failOnError(err, "Unmarshaling")
logError(err, fmt.Sprintf("Unmarshaling %s", body))
return
}
func handleDeploymentUpdate(update deploymentUpdate) error {
log.Printf(" [U] %s", update)
return nil
func readTask(body []byte) (ti task) {
err := json.Unmarshal(body, &ti)
logError(err, fmt.Sprintf("Unmarshaling %s", body))
return
}
func (c *config) handleInitialUpdates(iu initialUpdate) {
for _, ti := range iu.Tasks {
c.handleTask(ti)
}
}
func handleInitialUpdates(initialUpdate deploymentUpdates) {
for serviceName, updates := range initialUpdate.Services {
log.Printf(" [I] Updates for %s: %v", serviceName, len(updates))
for _, update := range updates {
handleDeploymentUpdate(update)
func (c *config) handleTask(ti task) (err error) {
/*
switch ti.Action {
case "deploy":
log.Printf(" [U] %s:%s +%s", ti.Service, ti.User, ti.Key.Name)
case "withdraw":
log.Printf(" [U] %s:%s -%s", ti.Service, ti.User, ti.Key.Name)
default:
log.Printf(" [E] invalid action %s", ti.Action)
}
*/
if err != nil {
log.Printf(" [U] %s:%s %s %s ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
return
}
ok, err := c.ackTask(ti)
if ok {
log.Printf(" [U] %s:%s %s %s ACK", ti.Service, ti.User, ti.Action, ti.Key.Name)
} else {
log.Printf(" [U] %s:%s %s %s ACK-ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
}
return
}
func (c *config) fetchInitialUpdates() (updates deploymentUpdates, err error) {
func (c *config) ackTask(ti task) (ok bool, err error) {
url := fmt.Sprintf(
"https://%s/backend/clientapi/ack/%d/",
c.Host,
ti.ID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return
}
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
var ackResponse response
err = json.Unmarshal(respBytes, &ackResponse)
ok = ackResponse.OK
return
}
func (c *config) fetchInitialUpdate() (updates initialUpdate, err error) {
if len(c.Services) == 0 {
log.Printf(" [I] Not fetching initial updates because the are no services to fetch")
return
......@@ -170,37 +223,32 @@ func (c *config) fetchInitialUpdates() (updates deploymentUpdates, err error) {
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &updates)
if err != nil {
return
}
log.Printf(" [I] Received initial updates:")
log.Printf(" [I] %d new task items", len(updates.Tasks))
return
}
func (c *config) fetchConfiguredServices() []service {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
failOnError(err, "fetching services")
logError(err, "fetching services")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
resp, err := client.Do(req)
failOnError(err, "fetching services")
logError(err, "fetching services")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var services []service
err = json.Unmarshal(body, &services)
failOnError(err, fmt.Sprintf("decoding services (%s)", body))
logError(err, fmt.Sprintf("decoding services (%s)", body))
return services
}
func (c *config) pubSub() (err error) {
if len(c.Services) == 0 {
if len(c.Services) < 1 {
log.Printf(" [P] Not starting pubsub because the are no services to subscribe to")
return
}
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s", c.Username, c.Token, c.Host))
conn, err := amqp.Dial(fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Token, c.Host))
if err != nil {
return
}
......@@ -212,10 +260,16 @@ func (c *config) pubSub() (err error) {
}
defer ch.Close()
// puts this channel into confirm mode
ch.Confirm(
false, // noWait
)
// the queue we connect to the topic exchange
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
......@@ -224,17 +278,21 @@ func (c *config) pubSub() (err error) {
return
}
// all the bindings from our queue to the topic exchange
for _, s := range c.Services {
routingKey := "service." + s.Name
log.Printf(" [P] Binding queue to exchange '%s' with routing key '%s'",
c.ExchangeName, routingKey)
log.Printf(
" [P] Binding queue '%s' to exchange '%s' with routing key '%s'",
q.Name,
c.ExchangeName,
routingKey)
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
c.ExchangeName, // exchange
false,
nil)
false, // no-wait
nil) // args
if err != nil {
return
}
......@@ -253,25 +311,32 @@ func (c *config) pubSub() (err error) {
return
}
forever := make(chan bool)
go func() {
for d := range msgs {
var err error
err = handleDeploymentUpdate(readDeploymentUpdate(d.Body))
failOnError(err, "Failed to handle message")
err = ch.Ack(
d.DeliveryTag,
false, // ack multiple (since delivery tag)
)
err = c.handleTask(readTask(d.Body))
if err != nil {
return
msg := "Failed to handle deployment update"
log.Printf(" [E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
d.Ack(
false, // multiple
)
}
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
// run till killed
forever := make(chan bool)
<-forever
return
}
......@@ -290,18 +355,18 @@ func main() {
// read the config file
conf, err := readConfig(*configFile)
failOnError(err, "Reading config file "+*configFile)
logError(err, "Reading config file "+*configFile)
// we first get the changes which happened while we were offline
initialUpdates, err := conf.fetchInitialUpdates()
failOnError(err, "Fetching initial updates ")
initialUpdates, err := conf.fetchInitialUpdate()
logError(err, "Fetching initial updates ")
handleInitialUpdates(initialUpdates)
conf.handleInitialUpdates(initialUpdates)
if !*oneshot {
// and then connect to rabbitmq for live updates for these services
err = conf.pubSub()
failOnError(err, "PubSub System")
logError(err, "PubSub System")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment