Commit 7531772d authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Rewrite client architecture

parent 48ca3b7f
package config
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"time"
)
type (
// A ServiceID locally identies a provided Service.
ServiceID string
// Service for which we do deployments
Service struct {
Name string `json:"name"`
Command string `json:"command"`
Description string `json:"description"`
}
// RabbitMQConfig is used for the amqp source
RabbitMQConfig struct {
Exchanges []string `json:"exchanges"`
Vhost string `json:"vhost"`
ReconnectTimeout time.Duration
}
fetchedConfig struct {
RabbitMQConfig RabbitMQConfig `json:"rabbitmq_config"`
Site string `json:"site"`
}
// The DebugConfig sets the debuging levels for components of the client.
DebugConfig struct {
// Sequential causes the execution of only one deployment at a time for debugging purposes
Sequential bool `json:"sequential_execution"`
Scripts bool `json:"scripts"`
Backend bool `json:"backend"`
}
// Config is the structure of our config file
Config struct {
Hostname string `json:"feudal_backend_host"`
Username string `json:"username"`
Password string `json:"password"`
// Services maps an (arbitrary) service identifier to service structs
// the service identifiers are referenced in GroupToServiceIDs and EntitlementToServiceIDs
Services map[ServiceID]Service
// GroupToServiceIDs determines which services are provided for users of the
// group
// maps a group name to service identifiers of the services
// services are declared in Config.Services
GroupToServiceIDs map[string][]ServiceID `json:"group_to_service_ids"`
// EntitlementToServiceIDs determines which services are provided for users of the
// entitlement
// maps an entitlement to service identifiers of the services
// services are declared in Config.Services
EntitlementToServiceIDs map[string][]ServiceID `json:"entitlement_to_service_ids"`
// FetchIntervalString gets parsed by time.ParseDuration
FetchIntervalString string `json:"fetch_interval"`
// ReconnectTimeout gets parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"`
FetchInterval time.Duration
// ReconnectTimeout time.Duration
RabbitMQConfig RabbitMQConfig
Site string
// debug flags
Debug DebugConfig `json:"debug"`
}
// strippedConfig is sent to the backend on startup
strippedConfig struct {
Services map[ServiceID]Service `json:"services"`
GroupToServiceIDs map[string][]ServiceID `json:"group_to_service_ids"`
EntitlementToServiceIDs map[string][]ServiceID `json:"entitlement_to_service_ids"`
}
)
const (
defaultFetchInterval = 30 * time.Minute
defaultReconnectTimeout = 10 * time.Second
)
func (s Service) String() string {
return s.Name
}
// GetServices resolves service ids into services
func (conf *Config) GetServices(sids []ServiceID) (services []Service, err error) {
var ok bool
services = make([]Service, len(sids))
for i, sid := range sids {
if services[i], ok = conf.Services[sid]; !ok {
services = nil
err = fmt.Errorf("Service with service ID '%s' does no exist", sid)
return
}
}
return
}
// Sync sends our config to the backend to inform it about changes
func (conf *Config) Sync() (err error) {
if conf.Debug.Backend {
log.Printf("[Conf] Synchronising configuration with %v", conf.Hostname)
}
var (
strippedConfigBytes []byte
req *http.Request
resp *http.Response
fetchedConfig fetchedConfig
)
// we inform the backend which services we provide
strippedConfigBytes, err = json.Marshal(strippedConfig{
Services: conf.Services,
GroupToServiceIDs: conf.GroupToServiceIDs,
EntitlementToServiceIDs: conf.EntitlementToServiceIDs,
})
if err != nil {
err = fmt.Errorf("Error syncing config: %s", err)
return
}
// update the services tracked by the backend
req, err = http.NewRequest(
"PUT",
"https://"+conf.Hostname+"/backend/clientapi/config",
bytes.NewReader(strippedConfigBytes),
)
if err != nil {
err = fmt.Errorf("Error requesting remote config: %s", err)
return
}
req.SetBasicAuth(conf.Username, conf.Password)
req.Header.Set("Content-Type", "application/json")
resp, err = (&http.Client{}).Do(req)
if err != nil {
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
err = fmt.Errorf("Error reading remote config: %s", err)
return
}
if resp.StatusCode != 200 {
err = fmt.Errorf("Error with remote config request: Response was %v", resp.Status)
return
}
err = json.Unmarshal(body, &fetchedConfig)
if err != nil {
err = fmt.Errorf("Error parsing remote configuration: %s %s", err, body)
return
}
conf.RabbitMQConfig = fetchedConfig.RabbitMQConfig
conf.Site = fetchedConfig.Site
log.Printf("[Conf] Synchronised configuration with %v", conf.Hostname)
return
}
func (conf *Config) validateConfig() (err error) {
// check the config values
if conf.Hostname == "" {
return fmt.Errorf("No 'hostname' in config")
}
if conf.Username == "" {
return fmt.Errorf("No 'username' in config")
}
if conf.Password == "" {
return fmt.Errorf("No 'password' in config")
}
// try to parse duration, otherwise use default
if conf.FetchIntervalString == "" {
conf.FetchInterval = defaultFetchInterval
log.Printf("[Conf] Using default fetch_interval of %v", conf.FetchInterval)
} else {
conf.FetchInterval, err = time.ParseDuration(conf.FetchIntervalString)
if err != nil {
log.Printf("[Conf] Error parsing fetch interval: %s", err)
err = nil
conf.FetchInterval = defaultFetchInterval
log.Printf("[Conf] Using default fetch_interval of %v", conf.FetchInterval)
}
}
if conf.ReconnectTimeoutString == "" {
conf.RabbitMQConfig.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", conf.RabbitMQConfig.ReconnectTimeout)
} else {
conf.RabbitMQConfig.ReconnectTimeout, err = time.ParseDuration(conf.ReconnectTimeoutString)
if err != nil {
log.Printf("[Conf] Error parsing reconnect timeout: %s", err)
err = nil
conf.RabbitMQConfig.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", conf.RabbitMQConfig.ReconnectTimeout)
}
}
// check that the services names are unique
var exists bool
serviceNames := make(map[string]struct{}, len(conf.Services))
for _, service := range conf.Services {
if _, exists = serviceNames[service.Name]; !exists {
serviceNames[service.Name] = struct{}{}
} else {
return fmt.Errorf("Service name is ambiguous: %s", service.Name)
}
}
// check if referenced service ids exist
for _, sids := range conf.GroupToServiceIDs {
for _, sid := range sids {
if _, exists = conf.Services[sid]; !exists {
return fmt.Errorf("GroupToServiceIDs: service ID '%s' does not exist", sid)
}
}
}
for _, sids := range conf.EntitlementToServiceIDs {
for _, sid := range sids {
if _, exists = conf.Services[sid]; !exists {
return fmt.Errorf("EntitlementToServiceIDs: service ID '%s' does not exist", sid)
}
}
}
return
}
// ReadConfig reads a config file and validates it
func ReadConfig(configFile string) (conf *Config, err error) {
log.Printf("[Conf] Reading config file %s", configFile)
configBytes, err := ioutil.ReadFile(configFile)
if err != nil {
return
}
conf = new(Config)
err = json.Unmarshal(configBytes, conf)
if err != nil {
return
}
err = conf.validateConfig()
if err != nil {
err = fmt.Errorf("Error validating config:\n\t%s", err)
return
}
// strip the group authority from entitlement names
nameExtractor := regexp.MustCompile("^(.*?)#")
for entName, entServices := range conf.EntitlementToServiceIDs {
match := nameExtractor.FindStringSubmatch(entName)
if len(match) == 2 {
delete(conf.EntitlementToServiceIDs, entName)
conf.EntitlementToServiceIDs[match[1]] = entServices
}
}
log.Printf("[Conf] Services: %s", conf.Services)
log.Printf("[Conf] Groups: %s", conf.GroupToServiceIDs)
log.Printf("[Conf] Entitlements: %s", conf.EntitlementToServiceIDs)
return
}
package deps
import (
"fmt"
"log"
"git.scc.kit.edu/feudal/feudalClient/config"
"git.scc.kit.edu/feudal/feudalScripts"
)
type (
// VO virtual organisation
VO struct {
ID int `json:"id"`
Name string `json:"name"`
ResourceType string `json:"resourcetype"`
}
// Dep describes a deployment action
Dep struct {
ID int `json:"id"`
StateTarget scripts.State `json:"state_target"`
User scripts.User `json:"user"`
Credentials map[string][]scripts.Credential `json:"credentials"`
Questionnaire map[string]string `json:"questionnaire"`
// this relates to the polymorphic type added by the polymorphic serializer
ResourceType string `json:"resourcetype"`
// VO is only set if this is a VODeployment
VO VO `json:"vo,omitempty"`
// Service may be filled by scheduleDep
Service config.Service `json:"service,omitempty"`
}
// Reply is sent back to the backend to inform it about executed deployment actions
Reply struct {
// ID of the according Dep (at the backend: DeploymentState)
ID int `json:"id"`
Output scripts.Output `json:"output"`
// Service is the service for which we performed the deploy action
Service config.Service `json:"service"`
}
)
func (dep Dep) String() string {
return fmt.Sprintf("[Dep:%v:%v]", dep.ID, dep.Service.Name)
}
// Log logs a message for a Dep
func (dep Dep) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{dep}, params...)...)
}
func (rep Reply) String() string {
return fmt.Sprintf("[Dep:%v:%v]", rep.ID, rep.Service.Name)
}
// Log logs a message for a Reply
func (rep Reply) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{rep}, params...)...)
}
package deps
import (
"fmt"
"log"
"git.scc.kit.edu/feudal/feudalClient/config"
"git.scc.kit.edu/feudal/feudalScripts"
)
type (
// VO virtual organisation
VO struct {
ID int `json:"id"`
Name string `json:"name"`
ResourceType string `json:"resourcetype"`
}
// Dep describes a deployment action
Dep struct {
ID int `json:"id"`
StateTarget scripts.State `json:"state_target"`
User scripts.User `json:"user"`
Credentials map[string][]scripts.Credential `json:"credentials"`
Questionnaire map[string]string `json:"questionnaire"`
// this relates to the polymorphic type added by the polymorphic serializer
ResourceType string `json:"resourcetype"`
// VO is only set if this is a VODeployment
VO VO `json:"vo,omitempty"`
// Service may be filled by scheduleDep
Service config.Service `json:"service,omitempty"`
}
// Reply is sent back to the backend to inform it about executed deployment actions
Reply struct {
// ID of the according Dep (at the backend: DeploymentState)
ID int `json:"id"`
Output scripts.Output `json:"output"`
// Service is the service for which we performed the deploy action
Service config.Service `json:"service"`
}
)
func (dep Dep) String() string {
return fmt.Sprintf("[Dep:%v:%v]", dep.ID, dep.Service.Name)
}
// Log logs a message for a Dep
func (dep Dep) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{dep}, params...)...)
}
func (rep Reply) String() string {
return fmt.Sprintf("[Dep:%v:%v]", rep.ID, rep.Service.Name)
}
// Log logs a message for a Reply
func (rep Reply) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{rep}, params...)...)
}
File added
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"regexp"
"time"
"git.scc.kit.edu/feudal/feudalClient/config"
"git.scc.kit.edu/feudal/feudalClient/deps"
"git.scc.kit.edu/feudal/feudalClient/sink"
"git.scc.kit.edu/feudal/feudalClient/sink/script"
"git.scc.kit.edu/feudal/feudalClient/source"
"git.scc.kit.edu/feudal/feudalClient/source/amqp"
"git.scc.kit.edu/feudal/feudalClient/source/rest"
"gopkg.in/alecthomas/kingpin.v2"
)
type (
rabbitMQConfig struct {
Exchanges []string `json:"exchanges"`
Vhost string `json:"vhost"`
}
fetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Site string `json:"site"`
}
serviceID string
config struct {
Hostname string `json:"feudal_backend_host"`
Username string `json:"username"`
Password string `json:"password"`
// Services maps an (arbitrary) service identifier to service structs
// the service identifiers are referenced in GroupToServiceIDs and EntitlementToServiceIDs
Services map[serviceID]service
// GroupToServiceIDs determines which services are provided for users of the
// group
// maps a group name to service identifiers of the services
// services are declared in config.Services
GroupToServiceIDs map[string][]serviceID `json:"group_to_service_ids"`
// EntitlementToServiceIDs determines which services are provided for users of the
// entitlement
// maps an entitlement to service identifiers of the services
// services are declared in config.Services
EntitlementToServiceIDs map[string][]serviceID `json:"entitlement_to_service_ids"`
// FetchIntervalString gets parsed by time.ParseDuration
FetchIntervalString string `json:"fetch_interval"`
// ReconnectTimeout gets parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"`
FetchInterval time.Duration
ReconnectTimeout time.Duration
RabbitMQConfig rabbitMQConfig
Site string
NewTasks chan task
DoneTasks chan taskReply
Fetch chan struct{}
Error chan error
}
// strippedConfig is sent to the backend on startup
strippedConfig struct {
Services map[serviceID]service `json:"services"`
GroupToServiceIDs map[string][]serviceID `json:"group_to_service_ids"`
EntitlementToServiceIDs map[string][]serviceID `json:"entitlement_to_service_ids"`
}
)
const (
defaultFetchInterval = 30 * time.Minute
defaultReconnectTimeout = 10 * time.Second
)
var (
client = &http.Client{}
app = kingpin.New(
app = kingpin.New(
"FEUDAL Client",
"Client for the Federated User Credential Deployment Portal (FEUDAL)",
).Author(
......@@ -96,192 +33,7 @@ var (
sequentialExecution = app.Flag("seq", "Execute tasks sequentially").Bool()
)
func logError(err error, msg string) {
if err != nil {
log.Printf("[E] %s: %s", msg, err)
}
}
func (c *config) syncConfig() (err error) {
log.Printf("[Conf] Synchronising configuration with %v", c.Hostname)
var (
strippedConfigBytes []byte
req *http.Request
resp *http.Response
fetchedConfig fetchedConfig
)
// we inform the backend which services we provide
strippedConfigBytes, err = json.Marshal(strippedConfig{
Services: c.Services,
GroupToServiceIDs: c.GroupToServiceIDs,
EntitlementToServiceIDs: c.EntitlementToServiceIDs,
})
if err != nil {
err = fmt.Errorf("Error syncing config: %s", err)
return
}
// update the services tracked by the backend
req, err = http.NewRequest(
"PUT",
"https://"+c.Hostname+"/backend/clientapi/config",
bytes.NewReader(strippedConfigBytes),
)
if err != nil {
err = fmt.Errorf("Error requesting remote config: %s", err)
return
}
req.SetBasicAuth(c.Username, c.Password)
req.Header.Set("Content-Type", "application/json")
resp, err = client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
err = fmt.Errorf("Error reading remote config: %s", err)
return
}
if resp.StatusCode != 200 {
err = fmt.Errorf("Error with remote config request: Response was %v", resp.Status)
return
}
err = json.Unmarshal(body, &fetchedConfig)
if err != nil {
err = fmt.Errorf("Error parsing remote configuration: %s %s", err, body)
return
}
c.RabbitMQConfig = fetchedConfig.RabbitMQConfig
c.Site = fetchedConfig.Site
// initialize the task queues
c.NewTasks = make(chan task)
c.DoneTasks = make(chan taskReply)
c.Fetch = make(chan struct{})
c.Error = make(chan error)
log.Printf("[Conf] Synchronised configuration with %v", c.Hostname)
return
}
func (c *config) validateConfig() (err error) {
// check the config values
if c.Hostname == "" {
return fmt.Errorf("No 'hostname' in config")
}
if c.Username == "" {
return fmt.Errorf("No 'username' in config")
}
if c.Password == "" {
return fmt.Errorf("No 'password' in config")
}
// try to parse duration, otherwise use default
if c.FetchIntervalString == "" {
c.FetchInterval = defaultFetchInterval
log.Printf("[Conf] Using default fetch_interval of %v", c.FetchInterval)
} else {
c.FetchInterval, err = time.ParseDuration(c.FetchIntervalString)
if err != nil {
log.Printf("[Conf] Error parsing fetch interval: %s", err)
err = nil
c.FetchInterval = defaultFetchInterval
log.Printf("[Conf] Using default fetch_interval of %v", c.FetchInterval)
}
}
if c.ReconnectTimeoutString == "" {
c.ReconnectTimeout = defaultReconnectTimeout
log.Printf("[Conf] Using default reconnect_timeout of %v", c.ReconnectTimeout)
} else {