Commit ff5291c2 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Initialize repository

parents
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/streadway/amqp"
)
type (
sshKey struct {
Name string `json:"name"`
Key string `json:"key"`
}
group struct {
Name string `json:"name"`
}
user struct {
Email string `json:"email"`
Sub string `json:"sub"`
Groups []group `json:"groups"`
}
deploymentUpdate struct {
User user `json:"user"`
Service service `json:"service"`
SSHKeys []sshKey `json:"ssh_keys"`
SSHKeysToWithdraw []sshKey `json:"ssh_keys_to_withdraw"`
}
service struct {
Name string `json:"name"`
}
serviceUpdate struct {
}
// used for initial updates
deploymentUpdates struct {
Services map[string]([]deploymentUpdate) `json:"services"`
}
)
// TODO put into config
var (
host = "hdf-portal.data.kit.edu"
username = "client0"
exchangeName = "deployments"
token = "a5bfc91993e9a95d6a04b1a8159f499f73c06b2e"
password = token
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client = &http.Client{Transport: tr}
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf(" [E] %s: %s", msg, err)
}
}
func (s service) String() string {
return s.Name
}
func (k sshKey) String() string {
return k.Name
}
func (du deploymentUpdate) String() string {
return fmt.Sprintf("%s: %s: deploy%s withdraw%s", du.Service, du.User.Email, du.SSHKeys, du.SSHKeysToWithdraw)
}
func readDeploymentUpdate(body []byte) (update deploymentUpdate) {
err := json.Unmarshal(body, &update)
failOnError(err, "Unmarshaling")
return
}
func fetchInitialUpdates() (updates deploymentUpdates) {
req, err := http.NewRequest("GET", "https://"+host+"/backend/clientapi/deployments", nil)
failOnError(err, "fetching services")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", token))
resp, err := client.Do(req)
failOnError(err, "fetching initial updates")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &updates)
failOnError(err, "decoding initial updates "+string(body))
log.Printf(" [I] Received initial updates:")
return
}
func handleDeploymentUpdate(update deploymentUpdate) {
log.Printf(" [U] %s", update)
}
func handleInitialUpdates(initialUpdate deploymentUpdates) {
for serviceName, updates := range initialUpdate.Services {
log.Printf(" [I] Updates for %s: %v", serviceName, len(updates))
for _, update := range updates {
handleDeploymentUpdate(update)
}
}
}
// fetch the services we are permitted to receive
func fetchConfiguredServices() []service {
req, err := http.NewRequest("GET", "https://"+host+"/backend/clientapi/config", nil)
failOnError(err, "fetching services")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", token))
resp, err := client.Do(req)
failOnError(err, "fetching services")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var services []service
err = json.Unmarshal(body, &services)
failOnError(err, "decoding services")
log.Printf(" [S] Received services: %s", services)
return services
}
func pubSub(services []service) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s", username, password, host))
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
for _, s := range services {
routingKey := "service." + s.Name
log.Printf(" [Q] Binding queue to exchange '%s' with routing key '%s'",
exchangeName, routingKey)
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchangeName, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
handleDeploymentUpdate(readDeploymentUpdate(d.Body))
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func main() {
services := fetchConfiguredServices()
// we first get the changes which happened while we were offline
initialUpdates := fetchInitialUpdates()
handleInitialUpdates(initialUpdates)
// and the connect to the pub sub system for live updates
pubSub(services)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment