Commit 134d183b authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Enhance the error handling

parent a7b294cc
......@@ -14,20 +14,6 @@ import (
)
type (
config struct {
Host string `json:"host"`
Username string `json:"username"`
ExchangeName string `json:"exchangeName"`
Token string `json:"token"`
Services []service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
DoneTasks chan task
FetchInterval time.Duration
ReconnectTimeout time.Duration
}
sshKey struct {
Name string `json:"name"`
Key string `json:"key"`
......@@ -65,17 +51,31 @@ type (
OK bool `json:"ok"`
}
config struct {
Host string `json:"host"`
Username string `json:"username"`
ExchangeName string `json:"exchangeName"`
Token string `json:"token"`
Services []service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
DoneTasks chan task
FetchInterval time.Duration
ReconnectTimeout time.Duration
}
consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
handler func(<-chan amqp.Delivery) // handler function for the deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
consumer func(<-chan amqp.Delivery) // consumes deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
}
)
......@@ -126,6 +126,30 @@ serviceLoop:
return
}
func (c *config) fetchConfiguredServices() (services []service, err error) {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
if err != nil {
return
}
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
err = json.Unmarshal(body, &services)
if err != nil {
return
}
return
}
func readConfig(configFile string) (c config, err error) {
log.Printf("[Conf] Reading config file %s", configFile)
......@@ -141,7 +165,10 @@ func readConfig(configFile string) (c config, err error) {
return
}
permittedServices := c.fetchConfiguredServices()
var permittedServices []service
if permittedServices, err = c.fetchConfiguredServices(); err != nil {
log.Printf("[Conf] Error fetching configured services: %s", err)
}
log.Printf("[Conf] Permitted services: %s", permittedServices)
if len(c.Services) == 0 {
......@@ -262,7 +289,6 @@ func (c *consumer) reconnect() {
time.Sleep(c.reconnectTimeout)
log.Printf("[Conn] Reconnecting")
if deliveries, err := c.connect(); err != nil {
log.Printf("[Conn] Error reconnecting: %s", err)
......@@ -278,8 +304,8 @@ func (c *consumer) reconnect() {
c.closed <- &reconnectError
} else {
// start the next handler
go c.handler(deliveries)
// start the next consumer
go c.consumer(deliveries)
// start the next reconnect
go c.reconnect()
......@@ -289,16 +315,50 @@ func (c *consumer) reconnect() {
func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect()
// start the first handler
go c.handler(deliveries)
// start the first consumer
go c.consumer(deliveries)
go c.reconnect()
return
}
func (c *config) handleInitialUpdates(iu update) {
for _, ti := range iu.Tasks {
c.handleTask(ti)
func (c *config) consumer() (cons *consumer) {
var keys []string
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Token, c.Host),
exchange: c.ExchangeName,
exchangeType: "topic",
bindingKeys: keys,
consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout,
}
return
}
func (c *config) consume(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
var newTask task
err := json.Unmarshal(d.Body, &newTask)
if err != nil {
msg := "Failed to read deployment task"
log.Printf("[E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
// enqueue the task
log.Printf("[Task] %s: RECEIVED", newTask)
c.NewTasks <- newTask
d.Ack(
false, // multiple
)
}
}
}
......@@ -322,25 +382,6 @@ func (c *config) taskHandler() {
}
}
// acks all done tasks
func (c *config) taskAcker() {
for doneTask := range c.DoneTasks {
// handle tasks asynchronously
go func(t task) {
ok, err := c.ackTask(t);
if err != nil {
log.Printf("[Task] %s: ACK-ERROR: %s", t, err)
return
}
if !ok {
log.Printf("[Task] %s: ACK-FAILED", t)
return
}
log.Printf("[Task] %s: ACK", t)
}(doneTask)
}
}
func (c *config) ackTask(ti task) (ok bool, err error) {
url := fmt.Sprintf(
"https://%s/backend/clientapi/ack/%d/",
......@@ -369,6 +410,25 @@ func (c *config) ackTask(ti task) (ok bool, err error) {
return
}
// acks all done tasks
func (c *config) taskAcker() {
for doneTask := range c.DoneTasks {
// handle tasks asynchronously
go func(t task) {
ok, err := c.ackTask(t)
if err != nil {
log.Printf("[Task] %s: ACK-ERROR: %s", t, err)
return
}
if !ok {
log.Printf("[Task] %s: ACK-FAILED", t)
return
}
log.Printf("[Task] %s: ACK", t)
}(doneTask)
}
}
func (c *config) fetchTasks() (err error) {
if len(c.Services) == 0 {
log.Printf("[Fetch] Not fetching because the are no services to fetch")
......@@ -404,7 +464,7 @@ func (c *config) fetchTasks() (err error) {
log.Printf("[Fetch] %d new tasks", len(u.Tasks))
if err != nil {
log.Printf("[Fetch] error fetching: %s", err)
return
}
for _, task := range u.Tasks {
// put the in the task queue
......@@ -413,79 +473,23 @@ func (c *config) fetchTasks() (err error) {
return
}
func (c *config) fetchConfiguredServices() []service {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
logError(err, "fetching services")
req.Header.Add("Authorization", fmt.Sprintf("Token %s", c.Token))
resp, err := client.Do(req)
logError(err, "fetching services")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var services []service
err = json.Unmarshal(body, &services)
logError(err, fmt.Sprintf("decoding services (%s)", body))
return services
}
func (c *config) consumer() (cons *consumer) {
var keys []string
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Token, c.Host),
exchange: c.ExchangeName,
exchangeType: "topic",
bindingKeys: keys,
handler: c.handler,
reconnectTimeout: c.ReconnectTimeout,
}
return
}
func (c *config) handler(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
var newTask task
err := json.Unmarshal(d.Body, &newTask)
// fetches tasks (which rabbitmq missed) manually
func (c *config) taskFetcher() {
// start ticker
ticker := time.NewTicker(c.FetchInterval)
if err != nil {
msg := "Failed to read deployment task"
go func() {
for {
go func() {
if err := c.fetchTasks(); err != nil {
log.Printf("[Fetch] error fetching: %s", err)
}
}()
log.Printf("[E] %s: %s", msg, err)
d.Nack(
false, // multiple
false, // signal the server to requeue the message
)
} else {
// enqueue the task
log.Printf("[Task] %s: RECEIVED", newTask)
c.NewTasks <- newTask
d.Ack(
false, // multiple
)
// wait for a tick
<-ticker.C
}
}
}
func (c *config) pubSub() (err error) {
if len(c.Services) < 1 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
consumer := c.consumer()
defer consumer.close()
consumer.startConsuming()
// run till killed
forever := make(chan bool)
<-forever
return
}()
}
func main() {
......@@ -511,29 +515,24 @@ func main() {
log.Fatalf("[Err] No services. Exiting")
}
// start task handler and acker
// start task handler and acker and fetcher
go c.taskHandler()
go c.taskAcker()
go c.taskFetcher()
// starting ticker
ticker := time.NewTicker(c.FetchInterval)
go c.fetchTasks()
go func() {
for {
// wait for a tick
<-ticker.C
// we first get the changes which happened while we were offline
c.fetchTasks()
if !*oneshot {
if len(c.Services) < 1 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
}()
if !*oneshot {
consumer := c.consumer()
defer consumer.close()
consumer.startConsuming()
// and then connect to rabbitmq for live updates for these services
err = c.pubSub()
logError(err, "PubSub System")
// run till killed
forever := make(chan bool)
<-forever
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment