Commit c6a90f58 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Restructure the client code

parent 3020e744
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
type (
consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
consumer func(<-chan amqp.Delivery) // consumes deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
}
)
func (c *config) consumer() (cons *consumer) {
var keys []string
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchange: c.ExchangeName,
exchangeType: "topic",
bindingKeys: keys,
consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout,
}
return
}
func (c *consumer) close() {
c.channel.Close()
c.conn.Close()
}
func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
// open connection
c.conn, err = amqp.Dial(c.uri)
if err != nil {
return
}
// closed signal a closing of the connection
c.closed = c.conn.NotifyClose(make(chan *amqp.Error))
c.channel, err = c.conn.Channel()
if err != nil {
return
}
// puts this channel into confirm mode
c.channel.Confirm(
false, // noWait
)
// the queue we connect to the topic exchange
c.queue, err = c.channel.QueueDeclare(
"", // name
true, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return
}
// all the bindings from our queue to the topic exchange
for _, bindingKey := range c.bindingKeys {
/*
log.Printf(
"[P] Binding to exchange '%s' with routing key '%s'",
c.exchange,
bindingKey)
*/
if err = c.channel.QueueBind(
c.queue.Name, // queue name
bindingKey, // routing key
c.exchange, // exchange
false, // no-wait
nil,
); err != nil {
return
}
}
deliveries, err = c.channel.Consume(
c.queue.Name, // queue
"", // consumer
false, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return
}
log.Printf("[Conn] Connected to broker")
return
}
// report a closing connection and reconnect
// FIXME we have to check if the config changed (maybe after rest fetches?)
func (c *consumer) reconnect() {
// wait till the connection is closed
connectionError := <-c.closed
if connectionError.Reason != "reconnect failed" {
log.Printf("[Conn] Connection closed: %s", connectionError)
}
log.Printf("[Conn] Reconnecting in %s", c.reconnectTimeout)
time.Sleep(c.reconnectTimeout)
log.Printf("[Conn] Reconnecting")
if deliveries, err := c.connect(); err != nil {
log.Printf("[Conn] Error reconnecting: %s", err)
c.closed = make(chan *amqp.Error)
// start another reconnect
go c.reconnect()
// and trigger it
reconnectError := amqp.Error{
Reason: "reconnect failed",
}
c.closed <- &reconnectError
} else {
// start the next consumer
go c.consumer(deliveries)
// start the next reconnect
go c.reconnect()
}
}
func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect()
if err != nil {
log.Fatalf("[Conn] Error connecting: %s", err)
return
}
// start the first consumer
go c.consumer(deliveries)
go c.reconnect()
return
}
func (c *config) consume(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
// TODO remove: debug output for demonstration
/*
buf := new(bytes.Buffer)
json.Indent(buf, d.Body, "", " ")
log.Printf("DEMONSTRATION MESSAGE: %s", buf)
*/
var newTask task
err := json.Unmarshal(d.Body, &newTask)
if err != nil {
msg := "Failed to read deployment task"
log.Printf("[E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
// enqueue the task
log.Printf("[Task] %v: RECEIVED", newTask)
c.NewTasks <- newTask
d.Ack(
false, // multiple
)
}
}
}
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
......@@ -10,10 +9,37 @@ import (
"os"
"time"
"github.com/streadway/amqp"
"gopkg.in/alecthomas/kingpin.v2"
)
type (
rabbitMQConfig struct {
ExchangeName string `json:"exchange"`
Vhost string `json:"vhost"`
}
fetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Services []service `json:"services"`
}
config struct {
Host string `json:"host"`
Username string `json:"username"`
Password string `json:"password"`
Services map[string]service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
DoneTasks chan task
FetchInterval time.Duration
ReconnectTimeout time.Duration
ExchangeName string // fetched from backend
Vhost string // fetched from backend
}
)
const (
version = "0.1.0"
)
......@@ -28,6 +54,14 @@ func logError(err error, msg string) {
}
}
func (s service) String() string {
return s.Name
}
func (t task) String() string {
return fmt.Sprintf("%s:%s %8s %s", t.Service, t.User, t.Action, t.Key)
}
func (c *config) fetchConfig() (err error) {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
if err != nil {
......@@ -45,7 +79,7 @@ func (c *config) fetchConfig() (err error) {
if err != nil {
return
}
var fetchedConfig FetchedConfig
var fetchedConfig fetchedConfig
err = json.Unmarshal(body, &fetchedConfig)
if err != nil {
log.Fatalf("Unable to parse: %s %s", err, body)
......@@ -60,7 +94,6 @@ func (c *config) fetchConfig() (err error) {
if len(c.Services) == 0 {
log.Printf("[Conf] Config specifies no services. Using all permitted services")
c.Services = permittedServices
} else {
// if the config specifies services we check if they are all permitted
c.Services = filterPermitted(permittedServices, c.Services)
......@@ -70,6 +103,21 @@ func (c *config) fetchConfig() (err error) {
return
}
func filterPermitted(permitted []service, wanted map[string]service) (remainder map[string]service) {
remainder = make(map[string]service, len(wanted))
serviceLoop:
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
remainder[service.Name] = service
continue serviceLoop
}
}
log.Printf("[Conn] Skipping unpermitted service %s", service)
}
return
}
func getConfig(configFile string) (c config, err error) {
log.Printf("[Conf] Reading config file %s", configFile)
......@@ -127,322 +175,9 @@ func getConfig(configFile string) (c config, err error) {
return
}
func (c *consumer) close() {
c.channel.Close()
c.conn.Close()
}
func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
// open connection
c.conn, err = amqp.Dial(c.uri)
if err != nil {
return
}
// closed signal a closing of the connection
c.closed = c.conn.NotifyClose(make(chan *amqp.Error))
c.channel, err = c.conn.Channel()
if err != nil {
return
}
// puts this channel into confirm mode
c.channel.Confirm(
false, // noWait
)
// the queue we connect to the topic exchange
c.queue, err = c.channel.QueueDeclare(
"", // name
true, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return
}
// all the bindings from our queue to the topic exchange
for _, bindingKey := range c.bindingKeys {
/*
log.Printf(
"[P] Binding to exchange '%s' with routing key '%s'",
c.exchange,
bindingKey)
*/
if err = c.channel.QueueBind(
c.queue.Name, // queue name
bindingKey, // routing key
c.exchange, // exchange
false, // no-wait
nil,
); err != nil {
return
}
}
deliveries, err = c.channel.Consume(
c.queue.Name, // queue
"", // consumer
false, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return
}
log.Printf("[Conn] Connected to broker")
return
}
// report a closing connection and reconnect
// FIXME we have to check if the config changed (maybe after rest fetches?)
func (c *consumer) reconnect() {
// wait till the connection is closed
connectionError := <-c.closed
if connectionError.Reason != "reconnect failed" {
log.Printf("[Conn] Connection closed: %s", connectionError)
}
log.Printf("[Conn] Reconnecting in %s", c.reconnectTimeout)
time.Sleep(c.reconnectTimeout)
log.Printf("[Conn] Reconnecting")
if deliveries, err := c.connect(); err != nil {
log.Printf("[Conn] Error reconnecting: %s", err)
c.closed = make(chan *amqp.Error)
// start another reconnect
go c.reconnect()
// and trigger it
reconnectError := amqp.Error{
Reason: "reconnect failed",
}
c.closed <- &reconnectError
} else {
// start the next consumer
go c.consumer(deliveries)
// start the next reconnect
go c.reconnect()
}
}
func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect()
if err != nil {
log.Fatalf("[Conn] Error connecting: %s", err)
return
}
// start the first consumer
go c.consumer(deliveries)
go c.reconnect()
return
}
func (c *config) consumer() (cons *consumer) {
var keys []string
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchange: c.ExchangeName,
exchangeType: "topic",
bindingKeys: keys,
consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout,
}
return
}
func (c *config) consume(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
// TODO remove: debug output for demonstration
buf := new(bytes.Buffer)
json.Indent(buf, d.Body, "", " ")
log.Printf("DEMONSTRATION MESSAGE: %s", buf)
var newTask task
err := json.Unmarshal(d.Body, &newTask)
if err != nil {
msg := "Failed to read deployment task"
log.Printf("[E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
// enqueue the task
log.Printf("[Task] %s: RECEIVED", newTask)
c.NewTasks <- newTask
d.Ack(
false, // multiple
)
}
}
}
func (c *config) handleTask(ti task) (err error) {
time.Sleep(2 * time.Second)
log.Printf("[Task] %s: DONE", ti)
c.DoneTasks <- ti
return
}
// handles either the tasks from rest and from rabbitmq
func (c *config) taskHandler() {
for newTask := range c.NewTasks {
// handle tasks asynchronously
go func(t task) {
if err := c.handleTask(t); err != nil {
log.Printf("[Task] Error handling task: %s", err)
}
}(newTask)
}
}
func (c *config) ackTask(ti task) (ok bool, err error) {
url := fmt.Sprintf(
"https://%s/backend/clientapi/ack/%d/",
c.Host,
ti.ID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
// TODO retransmit ACK
return
}
req.SetBasicAuth(c.Username, c.Password)
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
var ackResponse response
err = json.Unmarshal(respBytes, &ackResponse)
if err != nil {
log.Printf("[Task] Invalid ACK response: %s", respBytes)
}
ok = ackResponse.OK
return
}
// acks all done tasks
func (c *config) taskAcker() {
for doneTask := range c.DoneTasks {
// handle tasks asynchronously
go func(t task) {
ok, err := c.ackTask(t)
if err != nil {
log.Printf("[Task] %s: ACK-ERROR: %s", t, err)
return
}
if !ok {
log.Printf("[Task] %s: ACK-FAILED", t)
return
}
log.Printf("[Task] %s: ACK", t)
}(doneTask)
}
}
func (c *config) fetchTasks() (err error) {
if len(c.Services) == 0 {
log.Printf("[Fetch] Not fetching because the are no services to fetch")
return
}
// construct a request
uri := "https://" + c.Host + "/backend/clientapi/deployments"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return
}
// add query params for the services
q := req.URL.Query()
for _, service := range c.Services {
q.Add("s", service.Name)
}
req.URL.RawQuery = q.Encode()
req.SetBasicAuth(c.Username, c.Password)
// execute the request
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
var u update
body, err := ioutil.ReadAll(resp.Body)
// TODO remove: debug output for demonstration
buf := new(bytes.Buffer)
json.Indent(buf, body, "", " ")
log.Printf("DEMONSTRATION MESSAGE: %s", buf)
err = json.Unmarshal(body, &u)
log.Printf("[Fetch] %d new tasks", len(u.Tasks))
if err != nil {
return
}
for _, task := range u.Tasks {
// put the in the task queue
c.NewTasks <- task
}
return
}
// fetches tasks (which rabbitmq missed) manually
func (c *config) taskFetcher() {
// start ticker
ticker := time.NewTicker(c.FetchInterval)
go func() {
for {
go func() {
if err := c.fetchTasks(); err != nil {
log.Printf("[Fetch] error fetching: %s", err)
}
}()
// wait for a tick
<-ticker.C
}
}()
}
func main() {
var err error
app := kingpin.New("fumClient", "Client for the 'Federated user management in the Helmholtz Data Federation' project")
app := kingpin.New("FEUDAL Client", "Client for the Federated User Credential Deployment Portal (FEUDAL)")
configFile := app.Arg("config", "Config file to file to use.").Required().String()
// oneshot := app.Flag("oneshot", "Fetch deployments with HTTP. Do not use rabbitmq.").Short('1').Bool()
......@@ -469,7 +204,9 @@ func main() {
go c.taskAcker()
consumer := c.consumer()
defer consumer.close()
defer func() {
consumer.close()
}()
consumer.startConsuming()
......
package main
import (