Commit 23b83a8d authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Fix reconnect timeout

parent bf7a665c
......@@ -24,9 +24,8 @@ type (
// RabbitMQConfig is used for the amqp source
RabbitMQConfig struct {
Exchanges []string `json:"exchanges"`
Vhost string `json:"vhost"`
ReconnectTimeout time.Duration
Exchanges []string `json:"exchanges"`
Vhost string `json:"vhost"`
}
fetchedConfig struct {
......@@ -44,9 +43,13 @@ type (
// Config is the structure of our config file
Config struct {
// The Hostname of the feudal backends host
Hostname string `json:"feudal_backend_host"`
// The Username of this client, registered at the backend.
Username string `json:"username"`
// The Password of this client, registered at the backend.
Password string `json:"password"`
// Services maps an (arbitrary) service identifier to service structs
......@@ -67,15 +70,23 @@ type (
// FetchIntervalString gets parsed by time.ParseDuration
FetchIntervalString string `json:"fetch_interval"`
// ReconnectTimeout gets parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"`
// After the duration of FetchInterval the client will fetch updates using the REST
// interface.
FetchInterval time.Duration
// ReconnectTimeout time.Duration
// We reconnect to RabbitMQ after ReconnectTimeout.
ReconnectTimeout time.Duration
RabbitMQConfig RabbitMQConfig
Site string
// debug flags
// The Site is the name of the site of this client. It is fetched from the backend.
Site string
// Debug flags control the log levels of the client.
Debug DebugConfig `json:"debug"`
}
......@@ -89,7 +100,7 @@ type (
const (
defaultFetchInterval = 30 * time.Minute
defaultReconnectTimeout = 10 * time.Second
defaultReconnectTimeout = 30 * time.Second
)
func (s Service) String() string {
......@@ -208,16 +219,16 @@ func (conf *Config) validateConfig() (err error) {
}
if conf.ReconnectTimeoutString == "" {
conf.RabbitMQConfig.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", conf.RabbitMQConfig.ReconnectTimeout)
conf.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", conf.ReconnectTimeout)
} else {
conf.RabbitMQConfig.ReconnectTimeout, err = time.ParseDuration(conf.ReconnectTimeoutString)
conf.ReconnectTimeout, err = time.ParseDuration(conf.ReconnectTimeoutString)
if err != nil {
log.Printf("[Conf] Error parsing reconnect timeout: %s", err)
err = nil
conf.RabbitMQConfig.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", conf.RabbitMQConfig.ReconnectTimeout)
conf.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", conf.ReconnectTimeout)
}
}
......
......@@ -23,12 +23,11 @@ type (
Source struct {
Config *config.Config
Exchanges []exchange
closed chan *amqp.Error
connection *amqp.Connection
}
)
func (src *Source) connect() (deliveries <-chan amqp.Delivery, err error) {
func (src *Source) connect() (deliveries <-chan amqp.Delivery, closed <-chan *amqp.Error, err error) {
var (
channel *amqp.Channel
queue amqp.Queue
......@@ -40,7 +39,7 @@ func (src *Source) connect() (deliveries <-chan amqp.Delivery, err error) {
}
// closed signals a closing of the connection
src.closed = src.connection.NotifyClose(make(chan *amqp.Error))
closed = src.connection.NotifyClose(make(chan *amqp.Error))
channel, err = src.connection.Channel()
if err != nil {
......@@ -67,7 +66,6 @@ func (src *Source) connect() (deliveries <-chan amqp.Delivery, err error) {
// all the bindings from our queue to the topic exchange
for _, exchange := range src.Exchanges {
//if *backendDebugging {
if src.Config.Debug.Backend {
log.Printf("[AMQP] Binding to '%s' with routing keys %v", exchange.name, exchange.bindingKeys)
}
......@@ -134,9 +132,12 @@ func (src *Source) consume(sink chan<- deps.Dep, deliveries <-chan amqp.Delivery
)
}
}
// log.Printf("[AMQP] Start consuming")
for d := range deliveries {
go consumeWrapper(d)
}
// log.Printf("[AMQP] Done consuming")
}
func (src *Source) uri() string {
......@@ -190,34 +191,28 @@ func (src *Source) Connect() (sink <-chan deps.Dep, err error) {
pipe := make(chan deps.Dep)
sink = (<-chan deps.Dep)(pipe)
// start consumption / reconnect loop
// Reconnect loop
go func() {
var err error
var deliveries <-chan amqp.Delivery
for {
deliveries, err = src.connect()
deliveries, closed, err := src.connect()
if err != nil {
err = fmt.Errorf("[AMQP] Error connecting: %s", err)
log.Printf("[AMQP] Error connecting: %s", err)
} else {
go src.consume(pipe, deliveries)
}
// wait for the connection to break
select {
case connectionError := <-src.closed:
if connectionError.Reason != "reconnect failed" {
// Wait for the connection to close
select {
case connectionError := <-closed:
log.Printf("[AMQP] Connection closed: %s", connectionError)
}
log.Printf("[AMQP] Reconnecting in %s", src.Config.RabbitMQConfig.ReconnectTimeout)
time.Sleep(src.Config.RabbitMQConfig.ReconnectTimeout)
log.Printf("[AMQP] Reconnecting")
}
log.Printf("[AMQP] Reconnecting in %s", src.Config.ReconnectTimeout)
time.Sleep(src.Config.ReconnectTimeout)
}
}()
return
return sink, nil
}
// Close stops the source
......
......@@ -57,7 +57,7 @@ func (src Source) fetchDeps(sink chan<- deps.Dep) (err error) {
defer resp.Body.Close()
if resp.StatusCode != 200 {
err = fmt.Errorf("[HTTP] Unable to fetch tasks (response: %v)", resp.Status)
err = fmt.Errorf("Unable to fetch tasks (response: %v)", resp.Status)
return
}
......@@ -66,7 +66,7 @@ func (src Source) fetchDeps(sink chan<- deps.Dep) (err error) {
err = json.Unmarshal(body, &newDeps)
if err != nil {
err = fmt.Errorf("[HTTP] Error unmarshaling: %s\n%s", err, body)
err = fmt.Errorf("Error unmarshaling: %s\n%s", err, body)
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment