Commit a1ce4130 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Implement publish by site

parent d45eeeb6
......@@ -10,17 +10,21 @@ import (
)
type (
exchange struct {
name string
exchangeType string
bindingKeys []string
}
consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
uri string // uri of the rabbitmq server
exchanges []exchange
consumer func(<-chan amqp.Delivery) // consumes deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
rabbitMQConfig rabbitMQConfig
}
)
......@@ -29,13 +33,20 @@ func (c *config) consumer() (cons *consumer) {
for _, serviceName := range c.Services {
keys = append(keys, fmt.Sprintf("service.%s", serviceName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchange: c.ExchangeName,
exchangeType: "topic",
bindingKeys: keys,
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchanges: []exchange{
exchange{
"deployments", "topic", keys,
},
exchange{
"sites", "topic", []string{c.Site},
},
},
consumer: c.consume,
reconnectTimeout: c.ReconnectTimeout,
rabbitMQConfig: c.RabbitMQConfig,
}
return
}
......@@ -46,7 +57,6 @@ func (c *consumer) close() {
}
func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
// open connection
c.conn, err = amqp.Dial(c.uri)
if err != nil {
......@@ -80,22 +90,18 @@ func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
}
// all the bindings from our queue to the topic exchange
for _, bindingKey := range c.bindingKeys {
/*
log.Printf(
"[P] Binding to exchange '%s' with routing key '%s'",
c.exchange,
bindingKey)
*/
if err = c.channel.QueueBind(
c.queue.Name, // queue name
bindingKey, // routing key
c.exchange, // exchange
false, // no-wait
nil,
); err != nil {
return
for _, exchange := range c.exchanges {
log.Printf("[Conn] Binding to exchange %s with routing keys %v", exchange.name, exchange.bindingKeys)
for _, bindingKey := range exchange.bindingKeys {
if err = c.channel.QueueBind(
c.queue.Name, // queue name
bindingKey, // routing key
exchange.name, // exchange
false, // no-wait
nil,
); err != nil {
return
}
}
}
......
......@@ -14,13 +14,14 @@ import (
type (
rabbitMQConfig struct {
ExchangeName string `json:"exchange"`
Vhost string `json:"vhost"`
Exchanges []string `json:"exchanges"`
Vhost string `json:"vhost"`
}
fetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Services []service `json:"services"`
Site string `json:"site"`
}
config struct {
......@@ -34,9 +35,8 @@ type (
DoneTasks chan taskExecution
FetchInterval time.Duration
ReconnectTimeout time.Duration
ExchangeName string // fetched from backend
Vhost string // fetched from backend
RabbitMQConfig rabbitMQConfig
Site string
}
)
......@@ -86,8 +86,8 @@ func (c *config) fetchConfig() (err error) {
return
}
c.ExchangeName = fetchedConfig.RabbitMQConfig.ExchangeName
c.Vhost = fetchedConfig.RabbitMQConfig.Vhost
c.RabbitMQConfig = fetchedConfig.RabbitMQConfig
c.Site = fetchedConfig.Site
permittedServices := fetchedConfig.Services
log.Printf("[Conf] Permitted services: %s", permittedServices)
......@@ -215,6 +215,7 @@ func main() {
// run till killed
forever := make(chan bool)
<-forever
//if !*oneshot {
//} else {
// // fetch once and exit
......
......@@ -32,6 +32,9 @@ type (
Action string `json:"action"` // either "deploy" or "withdraw"
User User `json:"user"`
Key SSHKey `json:"key"`
// Questionnaire for the script when it rejected a previous execution
Questionnaire map[string]string `json:"questionnaire"`
}
// Status for Output
......@@ -39,23 +42,25 @@ type (
// Output of the deployment script
Output struct {
// Status: StatusSuccess, StatusFail, StatusReject
// Status: Success, Fail, Reject
Status Status `json:"status"`
// Message for the user
Msg string `json:"message"`
// data for the backend
Data map[string]interface{} `json:"data"`
// Questionnaire requested by the script (Status == Reject).
// Maps a key to a description of a the requested value
Questionnaire map[string]string `json:"questionnaire"`
}
)
const (
// Success value for Output.Status
// Success value for Status
Success Status = "success"
// Fail value for Output.Status
// Fail value for Status
Fail Status = "fail"
// Reject value for Output.Status
// Reject value for Status
Reject Status = "reject"
)
......
......@@ -20,11 +20,12 @@ type (
}
task struct {
ID int `json:"id"`
Action string `json:"action"`
Service service `json:"service"`
User scripts.User `json:"user"`
Key scripts.SSHKey `json:"key"`
ID int `json:"id"`
Action string `json:"action"`
Service service `json:"service"`
User scripts.User `json:"user"`
Key scripts.SSHKey `json:"key"`
Questionnaire map[string]string `json:"questionnaire"`
}
taskExecution struct {
......@@ -117,16 +118,17 @@ func (c *config) fetchTasks() (err error) {
}
defer resp.Body.Close()
var newTasks struct{ Tasks []task }
var newTasks []task
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &newTasks)
log.Printf("[Fetch] %d new tasks", len(newTasks.Tasks))
if err != nil {
err = fmt.Errorf("Error unmarshaling: %s\n%s", err, body)
return
}
for _, task := range newTasks.Tasks {
log.Printf("[Fetch] %d new tasks", len(newTasks))
for _, task := range newTasks {
// put the in the task queue
c.NewTasks <- task
}
......@@ -137,9 +139,10 @@ func (c *config) handleTask(ti task) (err error) {
// encode input as json
input := scripts.Input{
Action: ti.Action,
User: ti.User,
Key: ti.Key,
Action: ti.Action,
User: ti.User,
Key: ti.Key,
Questionnaire: ti.Questionnaire,
}
iBytes, err := input.Marshal()
if err != nil {
......@@ -148,7 +151,7 @@ func (c *config) handleTask(ti task) (err error) {
// execute the script
commandName := c.Services[ti.Service.Name].Command
log.Println("[Task] Executing: ", commandName)
log.Println("[Task] Executing:", commandName)
log.Printf("[Task] Input: %s", input)
cmd := exec.Command(commandName)
......@@ -199,11 +202,11 @@ func (c *config) handleTask(ti task) (err error) {
return
}
func (c *config) respondToTask(te taskExecution) (err error) {
responseBytes, err := json.Marshal(te)
responseBytes, err := json.MarshalIndent(te, "", " ")
if err != nil {
return
}
log.Printf("[Task] Sending ACK %v:\n%s", te.ID, responseBytes)
log.Printf("[Task] Sending ACK %v", te.ID)
url := fmt.Sprintf("https://%s/backend/clientapi/response", c.Host)
req, err := http.NewRequest("POST", url, bytes.NewReader(responseBytes))
......@@ -229,6 +232,15 @@ func (c *config) respondToTask(te taskExecution) (err error) {
if err != nil {
return
}
log.Printf("[Task] Backend response to ACK %v:\n%s", te.ID, respBytes)
var backendResponse struct {
Okay bool `json:"ok"`
}
err = json.Unmarshal(respBytes, &backendResponse)
if err != nil {
return
}
if !backendResponse.Okay {
log.Printf("[Task] Backend response to ACK %v:\n%s", te.ID, respBytes)
}
return
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment