Commits (2)
......@@ -31,7 +31,10 @@ type (
UserInfo scripts.UserInfo `json:"userinfo"`
// The Service which this Dep is related to.
Service config.Service `json:"service"`
Service config.Service
// ServiceName should be used to identify this deployments servic
ServiceName string `json:"service_name"`
// Optional Answers by the users. Must have the same keys as the related Questionnaire
Answers map[string]interface{} `json:"answers,omitempty"`
......@@ -71,10 +74,7 @@ type (
)
func (dep Dep) String() string {
if dep.Service != (config.Service{}) {
return fmt.Sprintf("Dep[%v]#%v", dep.Service.Name, dep.ID)
}
return fmt.Sprintf("Dep#%v", dep.ID)
return fmt.Sprintf("Dep[%v]#%v", dep.ServiceName, dep.ID)
}
// Log logs a message for a Dep
......
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"
"git.scc.kit.edu/feudal/feudalClient/config"
deps "git.scc.kit.edu/feudal/feudalClient/deployments"
......@@ -54,7 +57,7 @@ func deregister(conf *config.Config) {
log.Printf("Deregistering this client at the backend")
req, err = http.NewRequest(
"PUT",
"GET",
"https://"+conf.Hostname+"/client/deregister",
nil,
)
......@@ -68,11 +71,41 @@ func deregister(conf *config.Config) {
log.Fatalf("Unable to deregister: %s", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Cannot read response body")
}
log.Printf("Response: %s", body)
// the body contains deployments we need to remove before shutting down in the same format as the HTTP fetch
//
// start a sink to handle the last deployments
sink := new(script.Sink)
sinkPipe := make(chan deps.Dep)
sink.Init(conf)
err = sink.Connect(sinkPipe)
if err != nil {
log.Printf("Error connecting sink: %s", err)
}
// feed it the last deployments
var deps []deps.Dep
err = json.Unmarshal(body, &deps)
if err != nil {
err = fmt.Errorf("Unmarshaling response body: %s\n%s", err, body)
return
}
for _, dep := range deps {
sinkPipe <- dep
}
// wait for completion and sink shutdown
sink.Disconnect()
log.Printf("If there are no error above this line you can retire this client and don't need to run it again.")
}
func start(conf *config.Config) {
......@@ -104,7 +137,23 @@ func start(conf *config.Config) {
}
}
// TODO rework
// register Ctrl+C to gracefully shutdown the sources and sinks
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
stop := make(chan bool)
go func() {
sig := <-c
log.Printf("Stopping because of signal: %v", sig)
for _, src := range sources {
src.Disconnect()
}
for _, sink := range sinks {
sink.Disconnect()
}
stop <- true
}()
var received deps.Dep
for {
select {
......@@ -112,6 +161,9 @@ func start(conf *config.Config) {
sinkPipe <- received
case received = <-srcPipes[1]:
sinkPipe <- received
case <-stop:
log.Printf("Shutdown completed")
return
}
}
}
......@@ -253,15 +305,15 @@ func main() {
prepareDebugging(conf)
// sync the config with the backend, as the following commands depend on it
if err := conf.Sync(); err != nil {
log.Printf("[Conf] Error synchronizing configuration: %s", err)
return
}
switch cmd {
case "start":
// sync the config with the backend
if err := conf.Sync(); err != nil {
log.Printf("[Conf] Error synchronizing configuration: %s", err)
return
}
start(conf)
case "deregister":
deregister(conf)
}
......
......@@ -10,6 +10,7 @@ import (
"net/http"
"os/exec"
"strings"
"sync"
"time"
"git.scc.kit.edu/feudal/feudalClient/config"
......@@ -26,6 +27,15 @@ type (
// Replies for the backend
Replies chan deps.Reply
// src is the channel connected using Connect
stop chan bool
// wgDeployments tracks deployments which are currently scheduled / handled
wgDeployments *sync.WaitGroup
// wgReplies tracks replies
wgReplies *sync.WaitGroup
}
// ackResponse response of the backend for our response
......@@ -58,44 +68,41 @@ func completeReply(dep deps.Dep, reply deps.Reply) deps.Reply {
return reply
}
func (sink *Sink) depServices(dep deps.Dep) (services []config.Service, err error) {
func (sink *Sink) setService(dep *deps.Dep) (err error) {
// find the matching service
if dep.Service != (config.Service{}) {
for _, s := range sink.Config.Services {
if s.Name == dep.Service.Name {
services = []config.Service{s}
return
}
for _, s := range sink.Config.Services {
if s.Name == dep.ServiceName {
dep.Service = s
return
}
}
err = fmt.Errorf("Service for Dep does not exist: '%s'", dep.Service.Name)
err = fmt.Errorf("Service for Dep does not exist: '%s'", dep.ServiceName)
return
}
func (sink *Sink) scheduleDep(dep deps.Dep) (err error) {
// determine the services of the deployment
var services []config.Service
services, err = sink.depServices(dep)
// determine the services of the deployment and set dep.Service
err = sink.setService(&dep)
if err != nil {
return
}
// execute the deployment for all the services
for _, svc := range services {
dep.Service = svc
if sink.Config.Debug.Scripts {
dep.Log("Scheduling execution for %s", dep.Service.Name)
}
if sink.Config.Debug.Scripts {
dep.Log("Scheduling execution for %s", svc.Name)
}
// track the scheduled deployment in the wait group
sink.wgDeployments.Add(1)
// schedule the deployment
sink.Scheduled <- dep
}
// schedule the deployment
sink.Scheduled <- dep
return
}
func (sink *Sink) handleDep(dep deps.Dep) (output scripts.Output, err error) {
defer sink.wgDeployments.Done()
// encode input as json
input := scripts.Input{
......@@ -217,6 +224,8 @@ func (sink *Sink) depHandler() {
}
go func() {
sink.wgReplies.Add(1)
reply := deps.Reply{
ID: dep.ID,
State: output.State,
......@@ -309,10 +318,14 @@ func (sink *Sink) depResponder() {
if err != nil {
reply.Log("ACK-ERROR: %s", err)
// TODO retrying failed replies can easily create an endless loop. I should rethink the handling of failed replies
// retry failed replies
go sink.retryReply(reply)
return
}
// only mark as done when there was no error
sink.wgReplies.Done()
}(doneDep)
}
}
......@@ -322,10 +335,14 @@ func (sink *Sink) Init(conf *config.Config) {
sink.Config = conf
sink.Scheduled = make(chan deps.Dep)
sink.Replies = make(chan deps.Reply)
sink.stop = make(chan bool)
sink.wgDeployments = new(sync.WaitGroup)
sink.wgReplies = new(sync.WaitGroup)
}
// Connect connects the sink and starts handling incoming deployments
func (sink *Sink) Connect(src <-chan deps.Dep) (err error) {
// the responder finishes when sink.Replies closes
go sink.depResponder()
......@@ -333,28 +350,48 @@ func (sink *Sink) Connect(src <-chan deps.Dep) (err error) {
go sink.depHandler()
go func() {
// schedule until the src runs out
// schedule until sink.stop fires
var err error
for dep := range src {
err = sink.scheduleDep(dep)
if err != nil {
// we reply to the portal that we couldn't schedule this deployment
go func() {
reply := deps.Reply{
ID: dep.ID,
State: scripts.Rejected,
Msg: fmt.Sprintf("Error scheduling execution of this deployment: %s", err),
}
reply = completeReply(dep, reply) // fill in UserCredentialStates
sink.Replies <- reply
}()
log.Printf("[Sink] Error scheduling: %v", err)
for {
select {
case dep := <-src:
err = sink.scheduleDep(dep)
if err != nil {
// we reply to the portal that we couldn't schedule this deployment
go func() {
reply := deps.Reply{
ID: dep.ID,
State: scripts.Rejected,
Msg: fmt.Sprintf("Error scheduling execution of this deployment: %s", err),
}
reply = completeReply(dep, reply) // fill in UserCredentialStates
sink.wgReplies.Add(1)
sink.Replies <- reply
}()
log.Printf("[Sink] Error scheduling: %v", err)
}
case <-sink.stop:
return
}
}
// clean up before exiting
close(sink.Scheduled)
close(sink.Replies)
}()
return
}
func (sink *Sink) Disconnect() (err error) {
log.Printf("[Sink] Disconnecting sink ...")
sink.stop <- true
sink.wgDeployments.Wait()
sink.wgReplies.Wait()
// clean up before exiting
close(sink.Scheduled)
close(sink.Replies)
log.Printf("[Sink] Disconnected sink")
return
}
......@@ -2,7 +2,7 @@ package sink
import (
"git.scc.kit.edu/feudal/feudalClient/config"
"git.scc.kit.edu/feudal/feudalClient/deployments"
deps "git.scc.kit.edu/feudal/feudalClient/deployments"
)
// Sink handles incoming deployments
......@@ -13,4 +13,7 @@ type Sink interface {
// the Sink must close when the src channel closes
// must not block
Connect(src <-chan deps.Dep) (err error)
// Disconnect waits for this sink to finish all its deployments and replies and then cleans up and stops the sink
Disconnect() (err error)
}
......@@ -24,6 +24,7 @@ type (
Config *config.Config
Exchanges []exchange
connection *amqp.Connection
stop chan bool
}
)
......@@ -184,16 +185,18 @@ func (src *Source) Init(conf *config.Config) {
i++
}
src.Exchanges = []exchange{
exchange{
{
"groups", "topic", groupRoutingKeys,
},
exchange{
{
"entitlements", "topic", entitlementRoutingKeys,
},
exchange{
{
"services", "topic", serviceRoutingKeys,
},
}
src.stop = make(chan bool)
return
}
......@@ -204,6 +207,7 @@ func (src *Source) Connect() (sink <-chan deps.Dep, err error) {
// Reconnect loop
go func() {
reconnect:
for {
deliveries, closed, err := src.connect()
if err != nil {
......@@ -215,18 +219,24 @@ func (src *Source) Connect() (sink <-chan deps.Dep, err error) {
select {
case connectionError := <-closed:
log.Printf("[AMQP] Connection/Channel closed: %s", connectionError)
case <-src.stop:
break reconnect
}
}
log.Printf("[AMQP] Reconnecting in %s", src.Config.ReconnectTimeout)
time.Sleep(src.Config.ReconnectTimeout)
}
}()
return sink, nil
}
// Close stops the source
func (src *Source) Close() {
// Disconnect stops the source
func (src *Source) Disconnect() {
log.Printf("[AMQP] Disconnecting source ...")
src.stop <- true
src.connection.Close()
log.Printf("[AMQP] Disconnected source")
}
......@@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"net/http"
"sync"
"time"
"git.scc.kit.edu/feudal/feudalClient/config"
......@@ -18,6 +19,8 @@ type (
Source struct {
Config *config.Config
Ticker *time.Ticker
wg *sync.WaitGroup
stop chan bool
}
)
......@@ -26,6 +29,9 @@ var (
)
func (src Source) fetchDeps(sink chan<- deps.Dep, initial bool) (err error) {
defer src.wg.Done()
src.wg.Add(1)
if len(src.Config.EntitlementToServiceIDs) == 0 && len(src.Config.GroupToServiceIDs) == 0 {
log.Printf("[HTTP] Not fetching because the are no services to fetch")
return
......@@ -87,6 +93,8 @@ func (src Source) fetchDeps(sink chan<- deps.Dep, initial bool) (err error) {
// Init initializes the http source
func (src *Source) Init(conf *config.Config) {
src.Config = conf
src.wg = new(sync.WaitGroup)
src.stop = make(chan bool)
}
// Connect starts fetching using the rest interface.
......@@ -104,13 +112,15 @@ func (src *Source) Connect() (<-chan deps.Dep, error) {
select {
case _, ok := <-src.Ticker.C:
if !ok {
log.Printf("[HTTP] Source stopping")
log.Printf("[HTTP] Ticker stopped")
return
}
err = src.fetchDeps(sinkPipe, false)
if err != nil {
log.Printf("[HTTP] Error fetching: %s", err)
}
case <-src.stop:
return
}
}
}()
......@@ -128,6 +138,10 @@ func (src *Source) Connect() (<-chan deps.Dep, error) {
}
// Close stop the fetching
func (src *Source) Close() {
func (src *Source) Disconnect() {
log.Printf("[HTTP] Disconnecting source ...")
src.Ticker.Stop()
src.stop <- true
src.wg.Wait()
log.Printf("[HTTP] Disconnected source")
}
......@@ -2,7 +2,7 @@ package source
import (
"git.scc.kit.edu/feudal/feudalClient/config"
"git.scc.kit.edu/feudal/feudalClient/deployments"
deps "git.scc.kit.edu/feudal/feudalClient/deployments"
)
// Source are sources of deployments like amqp and http
......@@ -12,5 +12,7 @@ type Source interface {
// Connect starts the source
// must not block
Connect() (sink <-chan deps.Dep, err error)
Close()
// Disconnect stop this source
Disconnect()
}