Commit 9fdfc864 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Ensure the closing of connections

The problem was that the connection was kept open on channel errors.
We therefore ran into open file count limits.
parent 87ab4acf
......@@ -23,7 +23,7 @@ var (
).Author(
"Lukas Burgey",
).Version(
"v2.1.1",
"v2.1.2",
)
cmdStart = app.Command("start", "Starts the client in its normal operation mode.").Default()
cmdDeregister = app.Command("deregister", "Before disabling the client: Use deregister to inform the backend that the client ceases operation.")
......
......@@ -8,7 +8,7 @@ import (
"time"
"git.scc.kit.edu/feudal/feudalClient/config"
"git.scc.kit.edu/feudal/feudalClient/deployments"
deps "git.scc.kit.edu/feudal/feudalClient/deployments"
"github.com/streadway/amqp"
)
......@@ -32,24 +32,36 @@ func (src *Source) connect() (deliveries <-chan amqp.Delivery, closed <-chan *am
channel *amqp.Channel
queue amqp.Queue
)
closeConn := func() {
log.Printf("[AMQP] Closing connection after an error (logs will follow)")
closeErr := src.connection.Close()
if closeErr != nil {
log.Printf("[AMQP] Error closing connection: %s", closeErr)
}
}
// open connection
src.connection, err = amqp.Dial(src.uri())
if err != nil {
return
}
// closed signals a closing of the connection
closed = src.connection.NotifyClose(make(chan *amqp.Error))
channel, err = src.connection.Channel()
if err != nil {
closeConn()
return
}
// closed signals the closing of the connection / channel
closed = channel.NotifyClose(make(chan *amqp.Error))
// puts this channel into confirm mode
channel.Confirm(
// put the channel into confirm mode
err = channel.Confirm(
false, // noWait
)
if err != nil {
closeConn()
return
}
// the queue we connect to the topic exchange
queue, err = channel.QueueDeclare(
......@@ -61,6 +73,7 @@ func (src *Source) connect() (deliveries <-chan amqp.Delivery, closed <-chan *am
nil, // arguments
)
if err != nil {
closeConn()
return
}
......@@ -70,13 +83,15 @@ func (src *Source) connect() (deliveries <-chan amqp.Delivery, closed <-chan *am
log.Printf("[AMQP] Binding to '%s' with routing keys %v", exchange.name, exchange.bindingKeys)
}
for _, bindingKey := range exchange.bindingKeys {
if err = channel.QueueBind(
err = channel.QueueBind(
queue.Name, // queue name
bindingKey, // routing key
exchange.name, // exchange
false, // no-wait
nil,
); err != nil {
)
if err != nil {
closeConn()
err = fmt.Errorf("Error binding %s:\n\t%s", bindingKey, err)
return
}
......@@ -93,6 +108,7 @@ func (src *Source) connect() (deliveries <-chan amqp.Delivery, closed <-chan *am
nil, // args
)
if err != nil {
closeConn()
return
}
......@@ -203,7 +219,7 @@ func (src *Source) Connect() (sink <-chan deps.Dep, err error) {
// Wait for the connection to close
select {
case connectionError := <-closed:
log.Printf("[AMQP] Connection closed: %s", connectionError)
log.Printf("[AMQP] Connection/Channel closed: %s", connectionError)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment