Commit 613c5183 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Adapt client to fetch some config parameters from the backend

parent 134d183b
...@@ -51,11 +51,20 @@ type ( ...@@ -51,11 +51,20 @@ type (
OK bool `json:"ok"` OK bool `json:"ok"`
} }
rabbitMQConfig struct {
ExchangeName string `json:"exchange"`
Vhost string `json:"vhost"`
}
FetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Services []service `json:"services"`
}
config struct { config struct {
Host string `json:"host"` Host string `json:"host"`
Username string `json:"username"` Username string `json:"username"`
ExchangeName string `json:"exchangeName"` Password string `json:"password"`
Token string `json:"token"`
Services []service `json:"services"` Services []service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
...@@ -63,6 +72,9 @@ type ( ...@@ -63,6 +72,9 @@ type (
DoneTasks chan task DoneTasks chan task
FetchInterval time.Duration FetchInterval time.Duration
ReconnectTimeout time.Duration ReconnectTimeout time.Duration
ExchangeName string // fetched from backend
Vhost string // fetched from backend
} }
consumer struct { consumer struct {
...@@ -126,13 +138,13 @@ serviceLoop: ...@@ -126,13 +138,13 @@ serviceLoop:
return return
} }
func (c *config) fetchConfiguredServices() (services []service, err error) { func (c *config) fetchConfig() (err error) {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil) req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
if err != nil { if err != nil {
return return
} }
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token)) req.SetBasicAuth(c.Username, c.Password)
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return return
...@@ -143,14 +155,32 @@ func (c *config) fetchConfiguredServices() (services []service, err error) { ...@@ -143,14 +155,32 @@ func (c *config) fetchConfiguredServices() (services []service, err error) {
if err != nil { if err != nil {
return return
} }
err = json.Unmarshal(body, &services) var fetchedConfig FetchedConfig
err = json.Unmarshal(body, &fetchedConfig)
if err != nil { if err != nil {
log.Fatalf("Unable to parse: %s %s", err, body)
return return
} }
c.ExchangeName = fetchedConfig.RabbitMQConfig.ExchangeName
c.Vhost = fetchedConfig.RabbitMQConfig.Vhost
permittedServices := fetchedConfig.Services
log.Printf("[Conf] Permitted services: %s", permittedServices)
if len(c.Services) == 0 {
log.Printf("[Conf] Config specifies no services. Using all permitted services")
c.Services = permittedServices
} else {
// if the config specifies services we check if they are all permitted
c.Services = filterPermitted(permittedServices, c.Services)
}
log.Printf("[Conf] Services: %s", c.Services)
return return
} }
func readConfig(configFile string) (c config, err error) { func getConfig(configFile string) (c config, err error) {
log.Printf("[Conf] Reading config file %s", configFile) log.Printf("[Conf] Reading config file %s", configFile)
...@@ -165,21 +195,10 @@ func readConfig(configFile string) (c config, err error) { ...@@ -165,21 +195,10 @@ func readConfig(configFile string) (c config, err error) {
return return
} }
var permittedServices []service if err = c.fetchConfig(); err != nil {
if permittedServices, err = c.fetchConfiguredServices(); err != nil { log.Printf("[Conf] Error fetching remote config: %s", err)
log.Printf("[Conf] Error fetching configured services: %s", err) return
}
log.Printf("[Conf] Permitted services: %s", permittedServices)
if len(c.Services) == 0 {
log.Printf("[Conf] Config specifies no services. Using all permitted services")
c.Services = permittedServices
} else {
// if the config specifies services we check if they are all permitted
c.Services = filterPermitted(permittedServices, c.Services)
} }
log.Printf("[Conf] Services: %s", c.Services)
if c.FetchInterval, err = time.ParseDuration(c.FetchIntervalString); err != nil { if c.FetchInterval, err = time.ParseDuration(c.FetchIntervalString); err != nil {
log.Printf("[Conf] Error parsing fetch interval: %s", err) log.Printf("[Conf] Error parsing fetch interval: %s", err)
return return
...@@ -314,6 +333,10 @@ func (c *consumer) reconnect() { ...@@ -314,6 +333,10 @@ func (c *consumer) reconnect() {
func (c *consumer) startConsuming() (err error) { func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect() deliveries, err := c.connect()
if err != nil {
log.Fatalf("[Conn] Error connecting: %s", err)
return
}
// start the first consumer // start the first consumer
go c.consumer(deliveries) go c.consumer(deliveries)
...@@ -328,7 +351,7 @@ func (c *config) consumer() (cons *consumer) { ...@@ -328,7 +351,7 @@ func (c *config) consumer() (cons *consumer) {
keys = append(keys, fmt.Sprintf("service.%s", serviceName)) keys = append(keys, fmt.Sprintf("service.%s", serviceName))
} }
cons = &consumer{ cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Token, c.Host), uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchange: c.ExchangeName, exchange: c.ExchangeName,
exchangeType: "topic", exchangeType: "topic",
bindingKeys: keys, bindingKeys: keys,
...@@ -392,7 +415,7 @@ func (c *config) ackTask(ti task) (ok bool, err error) { ...@@ -392,7 +415,7 @@ func (c *config) ackTask(ti task) (ok bool, err error) {
return return
} }
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token)) req.SetBasicAuth(c.Username, c.Password)
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return return
...@@ -449,7 +472,7 @@ func (c *config) fetchTasks() (err error) { ...@@ -449,7 +472,7 @@ func (c *config) fetchTasks() (err error) {
} }
req.URL.RawQuery = q.Encode() req.URL.RawQuery = q.Encode()
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token)) req.SetBasicAuth(c.Username, c.Password)
// execute the request // execute the request
resp, err := client.Do(req) resp, err := client.Do(req)
...@@ -506,7 +529,7 @@ func main() { ...@@ -506,7 +529,7 @@ func main() {
kingpin.MustParse(app.Parse(os.Args[1:])) kingpin.MustParse(app.Parse(os.Args[1:]))
// read the config file // read the config file
c, err := readConfig(*configFile) c, err := getConfig(*configFile)
if err != nil { if err != nil {
log.Fatalf("[Err] No valid config. Exiting") log.Fatalf("[Err] No valid config. Exiting")
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment