Commit bb8f4418 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Fetch only configured services

parent 0760a249
......@@ -88,15 +88,16 @@ func (du deploymentUpdate) String() string {
}
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
serviceLoop:
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
remainder = append(remainder, service)
continue
continue serviceLoop
}
}
log.Printf(
" [C] Configured service %s is not permitted and therefore skipped",
" [C] Skipping unpermitted service %s",
service,
)
}
......@@ -112,7 +113,7 @@ func readConfig(configFile string) (c config, err error) {
log.Printf(" [C] Permitted services: %s", permittedServices)
if len(c.Services) == 0 {
log.Printf(" [C] Config specifies no services. Using all permitted services.")
log.Printf(" [C] Config specifies no services. Using all permitted services")
c.Services = permittedServices
} else {
// if the config specifies services we check if they are all permitted
......@@ -143,18 +144,40 @@ func handleInitialUpdates(initialUpdate deploymentUpdates) {
}
}
func (c *config) fetchInitialUpdates() (updates deploymentUpdates) {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/deployments", nil)
failOnError(err, "fetching services")
func (c *config) fetchInitialUpdates() (updates deploymentUpdates, err error) {
if len(c.Services) == 0 {
log.Printf(" [I] Not fetching initial updates because the are no services to fetch")
return
}
// construct a request
uri := "https://" + c.Host + "/backend/clientapi/deployments"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return
}
// add query params for the services
q := req.URL.Query()
for _, service := range c.Services {
q.Add("s", service.Name)
}
req.URL.RawQuery = q.Encode()
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
// execute the request
resp, err := client.Do(req)
failOnError(err, "fetching initial updates")
if err != nil {
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &updates)
failOnError(err, "decoding initial updates "+string(body))
if err != nil {
return
}
log.Printf(" [I] Received initial updates:")
......@@ -177,17 +200,21 @@ func (c *config) fetchConfiguredServices() []service {
return services
}
func (c *config) pubSub() {
func (c *config) pubSub() (err error) {
if len(c.Services) == 0 {
log.Printf(" [P] Not starting pubsub because the are no services to subscribe to.")
log.Printf(" [P] Not starting pubsub because the are no services to subscribe to")
return
}
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s", c.Username, c.Token, c.Host))
failOnError(err, "Failed to connect to RabbitMQ")
if err != nil {
return
}
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
if err != nil {
return
}
defer ch.Close()
q, err := ch.QueueDeclare(
......@@ -198,11 +225,13 @@ func (c *config) pubSub() {
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if err != nil {
return
}
for _, s := range c.Services {
routingKey := "service." + s.Name
log.Printf(" [Q] Binding queue to exchange '%s' with routing key '%s'",
log.Printf(" [P] Binding queue to exchange '%s' with routing key '%s'",
c.ExchangeName, routingKey)
err = ch.QueueBind(
......@@ -211,7 +240,9 @@ func (c *config) pubSub() {
c.ExchangeName, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
if err != nil {
return
}
}
msgs, err := ch.Consume(
......@@ -223,7 +254,9 @@ func (c *config) pubSub() {
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
if err != nil {
return
}
forever := make(chan bool)
......@@ -236,15 +269,19 @@ func (c *config) pubSub() {
d.DeliveryTag,
false, // ack multiple (since delivery tag)
)
failOnError(err, "Failed to acknowledge message")
if err != nil {
return
}
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
return
}
func main() {
var err error
app := kingpin.New("fumClient", "Client for the 'Federated user management in the Helmholtz Data Federation' project")
configFile := app.Arg("config", "Config file to file to use.").Required().String()
......@@ -261,12 +298,15 @@ func main() {
failOnError(err, "Reading config file "+*configFile)
// we first get the changes which happened while we were offline
initialUpdates := conf.fetchInitialUpdates()
initialUpdates, err := conf.fetchInitialUpdates()
failOnError(err, "Fetching initial updates ")
handleInitialUpdates(initialUpdates)
if !*oneshot {
// and then connect to rabbitmq for live updates for these services
conf.pubSub()
err = conf.pubSub()
failOnError(err, "PubSub System")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment