Commit 0760a249 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Add config handling

parent 07dcda4f
......@@ -14,6 +14,14 @@ import (
)
type (
config struct {
Host string `json:"host"`
Username string `json:"username"`
ExchangeName string `json:"exchangeName"`
Token string `json:"token"`
Services []service `json:"services"`
}
sshKey struct {
Name string `json:"name"`
Key string `json:"key"`
......@@ -53,14 +61,8 @@ const (
version = "0.1.0"
)
// TODO put into config
var (
host = "hdf-portal.data.kit.edu"
username = "client0"
exchangeName = "deployments"
token = "a5bfc91993e9a95d6a04b1a8159f499f73c06b2e"
password = token
// TODO fix certs
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
......@@ -85,27 +87,45 @@ func (du deploymentUpdate) String() string {
return fmt.Sprintf("%s: %s: deploy%s withdraw%s", du.Service, du.User.Email, du.SSHKeys, du.SSHKeysToWithdraw)
}
func readDeploymentUpdate(body []byte) (update deploymentUpdate) {
err := json.Unmarshal(body, &update)
failOnError(err, "Unmarshaling")
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
remainder = append(remainder, service)
continue
}
}
log.Printf(
" [C] Configured service %s is not permitted and therefore skipped",
service,
)
}
return
}
func fetchInitialUpdates() (updates deploymentUpdates) {
req, err := http.NewRequest("GET", "https://"+host+"/backend/clientapi/deployments", nil)
failOnError(err, "fetching services")
func readConfig(configFile string) (c config, err error) {
bs, err := ioutil.ReadFile(configFile)
err = json.Unmarshal(bs, &c)
failOnError(err, "Reading config file")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", token))
resp, err := client.Do(req)
failOnError(err, "fetching initial updates")
defer resp.Body.Close()
permittedServices := c.fetchConfiguredServices()
log.Printf(" [C] Permitted services: %s", permittedServices)
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &updates)
failOnError(err, "decoding initial updates "+string(body))
if len(c.Services) == 0 {
log.Printf(" [C] Config specifies no services. Using all permitted services.")
c.Services = permittedServices
} else {
// if the config specifies services we check if they are all permitted
c.Services = filterPermitted(permittedServices, c.Services)
}
log.Printf(" [C] Services: %s", c.Services)
log.Printf(" [I] Received initial updates:")
return
}
func readDeploymentUpdate(body []byte) (update deploymentUpdate) {
err := json.Unmarshal(body, &update)
failOnError(err, "Unmarshaling")
return
}
......@@ -123,11 +143,29 @@ func handleInitialUpdates(initialUpdate deploymentUpdates) {
}
}
func fetchConfiguredServices() []service {
req, err := http.NewRequest("GET", "https://"+host+"/backend/clientapi/config", nil)
func (c *config) fetchInitialUpdates() (updates deploymentUpdates) {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/deployments", nil)
failOnError(err, "fetching services")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
resp, err := client.Do(req)
failOnError(err, "fetching initial updates")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &updates)
failOnError(err, "decoding initial updates "+string(body))
log.Printf(" [I] Received initial updates:")
return
}
func (c *config) fetchConfiguredServices() []service {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
failOnError(err, "fetching services")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", token))
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
resp, err := client.Do(req)
failOnError(err, "fetching services")
defer resp.Body.Close()
......@@ -136,13 +174,15 @@ func fetchConfiguredServices() []service {
var services []service
err = json.Unmarshal(body, &services)
failOnError(err, fmt.Sprintf("decoding services (%s)", body))
log.Printf(" [C] Config: %s", services)
return services
}
func pubSub(services []service) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s", username, password, host))
func (c *config) pubSub() {
if len(c.Services) == 0 {
log.Printf(" [P] Not starting pubsub because the are no services to subscribe to.")
return
}
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s", c.Username, c.Token, c.Host))
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
......@@ -160,15 +200,15 @@ func pubSub(services []service) {
)
failOnError(err, "Failed to declare a queue")
for _, s := range services {
for _, s := range c.Services {
routingKey := "service." + s.Name
log.Printf(" [Q] Binding queue to exchange '%s' with routing key '%s'",
exchangeName, routingKey)
c.ExchangeName, routingKey)
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchangeName, // exchange
q.Name, // queue name
routingKey, // routing key
c.ExchangeName, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
......@@ -207,24 +247,26 @@ func pubSub(services []service) {
func main() {
app := kingpin.New("fumClient", "Client for the 'Federated user management in the Helmholtz Data Federation' project")
configFile := app.Arg("config", "Config file to file to use.").Required().String()
oneshot := app.Flag("oneshot", "Fetch deployments with HTTP. Do not use rabbitmq.").Short('1').Bool()
app.Author("Lukas Burgey")
app.Version(version)
// get input
// get arguments
kingpin.MustParse(app.Parse(os.Args[1:]))
// read the config file
conf, err := readConfig(*configFile)
failOnError(err, "Reading config file "+*configFile)
// we first get the changes which happened while we were offline
handleInitialUpdates(fetchInitialUpdates())
initialUpdates := conf.fetchInitialUpdates()
handleInitialUpdates(initialUpdates)
if !*oneshot {
// fetch the services we are permitted to receive
services := fetchConfiguredServices()
// and then connect to rabbitmq for live updates for these services
pubSub(services)
conf.pubSub()
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment