Commit 3020e744 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Restructure the code

parent f06fff00
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/streadway/amqp"
"gopkg.in/alecthomas/kingpin.v2"
)
type (
sshKey struct {
Name string `json:"name"`
Key string `json:"key"`
}
group struct {
Name string `json:"name"`
}
user struct {
Email string `json:"email"`
Sub string `json:"sub"`
Groups []group `json:"groups"`
}
service struct {
Name string `json:"name"`
DeployCommand string `json:"deploy"`
WithdrawCommand string `json:"withdraw"`
}
task struct {
ID int `json:"id"`
Action string `json:"action"`
User user `json:"user"`
Service service `json:"service"`
Key sshKey `json:"key"`
}
update struct {
Tasks []task `json:"tasks"`
}
response struct {
OK bool `json:"ok"`
}
rabbitMQConfig struct {
ExchangeName string `json:"exchange"`
Vhost string `json:"vhost"`
}
// FetchedConfig config from backend
FetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Services []service `json:"services"`
}
config struct {
Host string `json:"host"`
Username string `json:"username"`
Password string `json:"password"`
Services []service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
DoneTasks chan task
FetchInterval time.Duration
ReconnectTimeout time.Duration
ExchangeName string // fetched from backend
Vhost string // fetched from backend
}
consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
consumer func(<-chan amqp.Delivery) // consumes deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
}
)
const (
version = "0.1.0"
)
......@@ -107,39 +28,6 @@ func logError(err error, msg string) {
}
}
func (s service) String() string {
return s.Name
}
func (u user) String() string {
return u.Email
}
func (k sshKey) String() string {
return k.Name
}
func (t task) String() string {
return fmt.Sprintf("%s:%s %8s %s", t.Service, t.User, t.Key, strings.ToUpper(t.Action))
}
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
serviceLoop:
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
remainder = append(remainder, service)
continue serviceLoop
}
}
log.Printf(
"[Conn] Skipping unpermitted service %s",
service,
)
}
return
}
func (c *config) fetchConfig() (err error) {
req, err := http.NewRequest("GET", "https://"+c.Host+"/backend/clientapi/config", nil)
if err != nil {
......@@ -383,7 +271,13 @@ func (c *config) consumer() (cons *consumer) {
}
func (c *config) consume(deliveries <-chan amqp.Delivery) {
for d := range deliveries {
// TODO remove: debug output for demonstration
buf := new(bytes.Buffer)
json.Indent(buf, d.Body, "", " ")
log.Printf("DEMONSTRATION MESSAGE: %s", buf)
var newTask task
err := json.Unmarshal(d.Body, &newTask)
......@@ -408,7 +302,7 @@ func (c *config) consume(deliveries <-chan amqp.Delivery) {
func (c *config) handleTask(ti task) (err error) {
time.Sleep(time.Second)
time.Sleep(2 * time.Second)
log.Printf("[Task] %s: DONE", ti)
c.DoneTasks <- ti
return
......@@ -451,6 +345,9 @@ func (c *config) ackTask(ti task) (ok bool, err error) {
var ackResponse response
err = json.Unmarshal(respBytes, &ackResponse)
if err != nil {
log.Printf("[Task] Invalid ACK response: %s", respBytes)
}
ok = ackResponse.OK
return
}
......@@ -505,6 +402,12 @@ func (c *config) fetchTasks() (err error) {
var u update
body, err := ioutil.ReadAll(resp.Body)
// TODO remove: debug output for demonstration
buf := new(bytes.Buffer)
json.Indent(buf, body, "", " ")
log.Printf("DEMONSTRATION MESSAGE: %s", buf)
err = json.Unmarshal(body, &u)
log.Printf("[Fetch] %d new tasks", len(u.Tasks))
......
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
type (
sshKey struct {
Name string `json:"name"`
Key string `json:"key"`
}
group struct {
Name string `json:"name"`
}
user struct {
Email string `json:"email"`
Sub string `json:"sub"`
Groups []group `json:"groups"`
}
service struct {
Name string `json:"name"`
DeployCommand string `json:"deploy"`
WithdrawCommand string `json:"withdraw"`
}
task struct {
ID int `json:"id"`
Action string `json:"action"`
User user `json:"user"`
Service service `json:"service"`
Key sshKey `json:"key"`
}
update struct {
Tasks []task `json:"tasks"`
}
response struct {
OK bool `json:"ok"`
}
rabbitMQConfig struct {
ExchangeName string `json:"exchange"`
Vhost string `json:"vhost"`
}
// FetchedConfig config from backend
FetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Services []service `json:"services"`
}
config struct {
Host string `json:"host"`
Username string `json:"username"`
Password string `json:"password"`
Services []service `json:"services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
NewTasks chan task
DoneTasks chan task
FetchInterval time.Duration
ReconnectTimeout time.Duration
ExchangeName string // fetched from backend
Vhost string // fetched from backend
}
consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
closed chan *amqp.Error
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
bindingKeys []string // routing key that we are using
consumer func(<-chan amqp.Delivery) // consumes deliveries
reconnectTimeout time.Duration // timeout for reconnect after a connection closes
}
)
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
serviceLoop:
for _, service := range wanted {
for _, permittedService := range permitted {
if service.Name == permittedService.Name {
remainder = append(remainder, service)
continue serviceLoop
}
}
log.Printf(
"[Conn] Skipping unpermitted service %s",
service,
)
}
return
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment