Commit 3aac9a1d authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Fix reconnecting after a failed reconnect

parent 412c0ef3
......@@ -7,6 +7,7 @@ import (
"log"
"net/http"
"os"
"time"
"github.com/streadway/amqp"
"gopkg.in/alecthomas/kingpin.v2"
......@@ -14,11 +15,16 @@ import (
type (
config struct {
Host string `json:"host"`
Username string `json:"username"`
ExchangeName string `json:"exchangeName"`
Token string `json:"token"`
Services []service `json:"services"`
Host string `json:"host"`
Username string `json:"username"`
ExchangeName string `json:"exchangeName"`
Token string `json:"token"`
Services []service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
FetchInterval time.Duration
ReconnectTimeout time.Duration
}
sshKey struct {
......@@ -50,13 +56,26 @@ type (
Key sshKey `json:"key"`
}
initialUpdate struct {
update struct {
Tasks []task `json:"tasks"`
}
response struct {
OK bool `json:"ok"`
}
consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
handler func(<-chan amqp.Delivery) // handler function for the deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
}
)
const (
......@@ -69,7 +88,7 @@ var (
func logError(err error, msg string) {
if err != nil {
log.Printf(" [E] %s: %s", msg, err)
log.Printf("[E] %s: %s", msg, err)
}
}
......@@ -95,7 +114,7 @@ serviceLoop:
}
}
log.Printf(
" [C] Skipping unpermitted service %s",
"[Conn] Skipping unpermitted service %s",
service,
)
}
......@@ -103,67 +122,203 @@ serviceLoop:
}
func readConfig(configFile string) (c config, err error) {
log.Printf("[Conf] Reading config file %s", configFile)
bs, err := ioutil.ReadFile(configFile)
err = json.Unmarshal(bs, &c)
logError(err, "Reading config file")
if err != nil {
log.Printf("[Conf] Error reading config file: %s", err)
return
}
if err = json.Unmarshal(bs, &c); err != nil {
log.Printf("[Conf] Error parsing config file: %s", err)
return
}
permittedServices := c.fetchConfiguredServices()
log.Printf(" [C] Permitted services: %s", permittedServices)
log.Printf("[Conf] Permitted services: %s", permittedServices)
if len(c.Services) == 0 {
log.Printf(" [C] Config specifies no services. Using all permitted services")
log.Printf("[Conf] Config specifies no services. Using all permitted services")
c.Services = permittedServices
} else {
// if the config specifies services we check if they are all permitted
c.Services = filterPermitted(permittedServices, c.Services)
}
log.Printf(" [C] Services: %s", c.Services)
log.Printf("[Conf] Services: %s", c.Services)
if c.FetchInterval, err = time.ParseDuration(c.FetchIntervalString); err != nil {
log.Printf("[Conf] Error parsing fetch interval: %s", err)
return
}
log.Printf("[Conf] Fetch interval set to %s", c.FetchInterval)
if c.ReconnectTimeout, err = time.ParseDuration(c.ReconnectTimeoutString); err != nil {
log.Printf("[Conf] Error parsing reconnect timeout: %s", err)
return
}
log.Printf("[Conf] Reconnect timeout set to %s", c.ReconnectTimeout)
return
}
func readInitialUpdate(body []byte) (update initialUpdate) {
err := json.Unmarshal(body, &update)
logError(err, fmt.Sprintf("Unmarshaling %s", body))
func (c *consumer) close() {
c.channel.Close()
c.conn.Close()
}
func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
// open connection
c.conn, err = amqp.Dial(c.uri)
if err != nil {
return
}
// closed signal a closing of the connection
c.closed = c.conn.NotifyClose(make(chan *amqp.Error))
c.channel, err = c.conn.Channel()
if err != nil {
return
}
// puts this channel into confirm mode
c.channel.Confirm(
false, // noWait
)
// the queue we connect to the topic exchange
c.queue, err = c.channel.QueueDeclare(
"", // name
true, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return
}
// all the bindings from our queue to the topic exchange
for _, bindingKey := range c.bindingKeys {
/*
log.Printf(
"[P] Binding to exchange '%s' with routing key '%s'",
c.exchange,
bindingKey)
*/
if err = c.channel.QueueBind(
c.queue.Name, // queue name
bindingKey, // routing key
c.exchange, // exchange
false, // no-wait
nil,
); err != nil {
return
}
}
deliveries, err = c.channel.Consume(
c.queue.Name, // queue
"", // consumer
false, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return
}
log.Printf("[Conn] Connected to broker")
return
}
func readTask(body []byte) (ti task) {
err := json.Unmarshal(body, &ti)
logError(err, fmt.Sprintf("Unmarshaling %s", body))
// report a closing connection and reconnect
func (c *consumer) reconnect() {
// wait till the connection is closed
connectionError := <-c.closed
if connectionError.Reason != "reconnect failed" {
log.Printf("[Conn] Connection closed: %s", connectionError)
}
log.Printf("[Conn] Reconnecting in %s", c.reconnectTimeout)
time.Sleep(c.reconnectTimeout)
log.Printf("[Conn] Reconnecting")
if deliveries, err := c.connect(); err != nil {
log.Printf("[Conn] Error reconnecting: %s", err)
c.closed = make(chan *amqp.Error)
// start another reconnect
go c.reconnect()
// and trigger it
reconnectError := amqp.Error{
Reason: "reconnect failed",
}
c.closed <- &reconnectError
} else {
// start the next handler
go c.handler(deliveries)
// start the next reconnect
go c.reconnect()
}
}
func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect()
// start the first handler
go c.handler(deliveries)
go c.reconnect()
return
}
func (c *config) handleInitialUpdates(iu initialUpdate) {
func (c *config) handleInitialUpdates(iu update) {
for _, ti := range iu.Tasks {
c.handleTask(ti)
}
}
func (c *config) handleTask(ti task) (err error) {
/*
switch ti.Action {
case "deploy":
log.Printf(" [U] %s:%s +%s", ti.Service, ti.User, ti.Key.Name)
case "withdraw":
log.Printf(" [U] %s:%s -%s", ti.Service, ti.User, ti.Key.Name)
default:
log.Printf(" [E] invalid action %s", ti.Action)
}
*/
if err != nil {
log.Printf(" [U] %s:%s %s %s ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
log.Printf("[U] %s:%s %s %s ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
return
}
ok, err := c.ackTask(ti)
if ok {
log.Printf(" [U] %s:%s %s %s ACK", ti.Service, ti.User, ti.Action, ti.Key.Name)
log.Printf("[U] %s:%s %s %s ACK", ti.Service, ti.User, ti.Action, ti.Key.Name)
} else {
log.Printf(" [U] %s:%s %s %s ACK-ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
log.Printf("[U] %s:%s %s %s ACK-ERROR", ti.Service, ti.User, ti.Action, ti.Key.Name)
}
return
}
// handles either the tasks from rest and from rabbitmq
func (c *config) taskHandler() {
log.Printf("[T] Task handler started")
for newTask := range c.NewTasks {
go func(t task) {
if err := c.handleTask(t); err != nil {
log.Printf("[T] Error handling task: %s", err)
}
}(newTask)
}
}
func (c *config) ackTask(ti task) (ok bool, err error) {
url := fmt.Sprintf(
"https://%s/backend/clientapi/ack/%d/",
......@@ -192,9 +347,9 @@ func (c *config) ackTask(ti task) (ok bool, err error) {
return
}
func (c *config) fetchInitialUpdate() (updates initialUpdate, err error) {
func (c *config) fetchTasks() (tasks []task, err error) {
if len(c.Services) == 0 {
log.Printf(" [I] Not fetching initial updates because the are no services to fetch")
log.Printf("[U] Not fetching initial updates because the are no services to fetch")
return
}
......@@ -221,9 +376,18 @@ func (c *config) fetchInitialUpdate() (updates initialUpdate, err error) {
}
defer resp.Body.Close()
var u update
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &updates)
log.Printf(" [I] %d new task items", len(updates.Tasks))
err = json.Unmarshal(body, &u)
log.Printf("[U] %d new task items", len(u.Tasks))
if err != nil {
log.Printf("[U] error fetching update: %s", err)
}
for _, task := range tasks {
// put the in the task queue
c.NewTasks <- task
}
return
}
......@@ -243,95 +407,56 @@ func (c *config) fetchConfiguredServices() []service {
return services
}
func (c *config) pubSub() (err error) {
if len(c.Services) < 1 {
log.Printf(" [P] Not starting pubsub because the are no services to subscribe to")
return
func (c *config) consumer() (cons *consumer) {
var keys []string
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
}
conn, err := amqp.Dial(fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Token, c.Host))
if err != nil {
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Token, c.Host),
exchange: c.ExchangeName,
exchangeType: "topic",
bindingKeys: keys,
handler: c.handler,
reconnectTimeout: c.ReconnectTimeout,
}
defer ch.Close()
// puts this channel into confirm mode
ch.Confirm(
false, // noWait
)
return
}
// the queue we connect to the topic exchange
q, err := ch.QueueDeclare(
"", // name
true, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return
}
func (c *config) handler(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
var newTask task
err := json.Unmarshal(d.Body, &newTask)
// all the bindings from our queue to the topic exchange
for _, s := range c.Services {
routingKey := "service." + s.Name
log.Printf(
" [P] Binding queue '%s' to exchange '%s' with routing key '%s'",
q.Name,
c.ExchangeName,
routingKey)
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
c.ExchangeName, // exchange
false, // no-wait
nil) // args
if err != nil {
return
msg := "Failed to read deployment task"
log.Printf("[E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
// enqueue the task
c.NewTasks <- newTask
d.Ack(
false, // multiple
)
}
}
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
func (c *config) pubSub() (err error) {
if len(c.Services) < 1 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
go func() {
for d := range msgs {
var err error
err = c.handleTask(readTask(d.Body))
if err != nil {
msg := "Failed to handle deployment update"
log.Printf(" [E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
d.Ack(
false, // multiple
)
}
}
}()
consumer := c.consumer()
defer consumer.close()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
consumer.startConsuming()
// run till killed
forever := make(chan bool)
......@@ -354,19 +479,34 @@ func main() {
kingpin.MustParse(app.Parse(os.Args[1:]))
// read the config file
conf, err := readConfig(*configFile)
logError(err, "Reading config file "+*configFile)
c, err := readConfig(*configFile)
if err != nil {
log.Fatalf("[Err] No valid config. Exiting")
}
// initialize the task queue
c.NewTasks = make(chan task)
go c.taskHandler()
// starting ticker
ticker := time.NewTicker(c.FetchInterval)
// we first get the changes which happened while we were offline
initialUpdates, err := conf.fetchInitialUpdate()
logError(err, "Fetching initial updates ")
go c.fetchTasks()
conf.handleInitialUpdates(initialUpdates)
go func() {
for {
// wait for a tick
<-ticker.C
// we first get the changes which happened while we were offline
c.fetchTasks()
}
}()
if !*oneshot {
// and then connect to rabbitmq for live updates for these services
err = conf.pubSub()
err = c.pubSub()
logError(err, "PubSub System")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment