Commit 48ca3b7f authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Rework service configuration

Closes #3
parent 42d23327
......@@ -29,15 +29,6 @@ type (
}
)
func appendIfMissing(slice []string, newElem string) []string {
for _, elem := range slice {
if elem == newElem {
return slice
}
}
return append(slice, newElem)
}
func (c config) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{"[Conf]"}, params...)...)
}
......@@ -48,31 +39,32 @@ func (c consumer) Log(formatString string, params ...interface{}) {
func (c *config) consumer() (cons *consumer) {
// calculate all the routing keys for the exchanges
var (
groupRoutingKeys = make([]string, len(c.GroupToServiceIDs))
entitlementRoutingKeys = make([]string, len(c.EntitlementToServiceIDs))
serviceRoutingKeys = make([]string, len(c.Services))
)
groupRoutingKeys := []string{}
entitlementRoutingKeys := []string{}
// all services we provide (either for groups or entitlements)
serviceRoutingKeys := []string{}
for groupName, groupServices := range c.GroupToServices {
groupRoutingKeys = appendIfMissing(groupRoutingKeys, groupName)
for _, groupService := range groupServices {
serviceRoutingKeys = appendIfMissing(serviceRoutingKeys, groupService.Name)
}
i := 0
for groupName := range c.GroupToServiceIDs {
groupRoutingKeys[i] = groupName
i++
}
for entitlementName, entitlementServices := range c.EntitlementToServices {
entitlementRoutingKeys = appendIfMissing(entitlementRoutingKeys, entitlementName)
i = 0
for entitlementName := range c.EntitlementToServiceIDs {
entitlementRoutingKeys[i] = entitlementName
i++
}
for _, entitlementService := range entitlementServices {
serviceRoutingKeys = appendIfMissing(serviceRoutingKeys, entitlementService.Name)
}
i = 0
for _, service := range c.Services {
serviceRoutingKeys[i] = service.Name
i++
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Hostname),
exchanges: []exchange{
exchange{
"groups", "topic", groupRoutingKeys,
......@@ -142,7 +134,7 @@ func (c *consumer) connect() (deliveries <-chan amqp.Delivery, err error) {
false, // no-wait
nil,
); err != nil {
err = fmt.Errorf("Error binding %s: %s", bindingKey, err)
err = fmt.Errorf("Error binding %s:\n\t%s", bindingKey, err)
return
}
}
......@@ -205,7 +197,7 @@ func (c *consumer) reconnect() {
func (c *consumer) startConsuming() (err error) {
deliveries, err := c.connect()
if err != nil {
log.Printf("[AMQP] Error connecting: %s", err)
err = fmt.Errorf("[AMQP] Error connecting: %s", err)
return
}
......
......@@ -2,6 +2,9 @@ module git.scc.kit.edu/feudal/feudalClient
require (
git.scc.kit.edu/feudal/feudalScripts v1.0.1
github.com/koron/iferr v0.0.0-20180615142939-bb332a3b1d91 // indirect
github.com/stamblerre/gocode v0.0.0-20181128172141-22843d89bc5a // indirect
github.com/streadway/amqp v0.0.0-20181107104731-27835f1a64e9
golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
......@@ -7,7 +7,13 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5Vpd
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/koron/iferr v0.0.0-20180615142939-bb332a3b1d91 h1:hunjgdb3b21ZdRmzDPXii0EcnHpjH7uCP+kODoE1JH0=
github.com/koron/iferr v0.0.0-20180615142939-bb332a3b1d91/go.mod h1:C2tFh8w3I6i4lnUJfoBx2Hwku3mgu4wPNTtUNp1i5KI=
github.com/stamblerre/gocode v0.0.0-20181128172141-22843d89bc5a h1:XVxDNb6jzFAgDYoLAazPpGEe+KBtjc/gLRPcC7taWEw=
github.com/stamblerre/gocode v0.0.0-20181128172141-22843d89bc5a/go.mod h1:EM2T8YDoTCvGXbEpFHxarbpv7VE26QD1++Cb1Pbh7Gs=
github.com/streadway/amqp v0.0.0-20181107104731-27835f1a64e9 h1:xBuwuVDG/vbGv1b0Dn/06flcq0R6MITax8244EZYaKE=
github.com/streadway/amqp v0.0.0-20181107104731-27835f1a64e9/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY=
golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce h1:Gi26mRaGtAreZ9IadlBiwSJT1EDsfk4BSHBD9oxXEFY=
golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
......@@ -8,6 +8,7 @@ import (
"log"
"net/http"
"os"
"regexp"
"time"
"gopkg.in/alecthomas/kingpin.v2"
......@@ -24,36 +25,51 @@ type (
Site string `json:"site"`
}
serviceID string
config struct {
Host string `json:"host"`
Hostname string `json:"feudal_backend_host"`
Username string `json:"username"`
Password string `json:"password"`
// GroupToServices maps a group name to services provided for this group
// this is for deployment per _group_
GroupToServices map[string]([]service) `json:"group_to_services"`
// Services maps an (arbitrary) service identifier to service structs
// the service identifiers are referenced in GroupToServiceIDs and EntitlementToServiceIDs
Services map[serviceID]service
// EntitlementToServices maps a entitlement to services provided for users with this
// GroupToServiceIDs determines which services are provided for users of the
// group
// maps a group name to service identifiers of the services
// services are declared in config.Services
GroupToServiceIDs map[string][]serviceID `json:"group_to_service_ids"`
// EntitlementToServiceIDs determines which services are provided for users of the
// entitlement
EntitlementToServices map[string]([]service) `json:"entitlement_to_services"`
// maps an entitlement to service identifiers of the services
// services are declared in config.Services
EntitlementToServiceIDs map[string][]serviceID `json:"entitlement_to_service_ids"`
// FetchIntervalString gets parsed by time.ParseDuration
FetchIntervalString string `json:"fetch_interval"`
// ReconnectTimeout gets parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"`
NewTasks chan task
DoneTasks chan taskReply
FetchInterval time.Duration
ReconnectTimeout time.Duration
RabbitMQConfig rabbitMQConfig
Site string
NewTasks chan task
DoneTasks chan taskReply
Fetch chan struct{}
Error chan error
}
// strippedConfig is sent to the backend on startup
strippedConfig struct {
GroupToServices map[string]([]service) `json:"group_to_services"`
EntitlementToServices map[string]([]service) `json:"entitlement_to_services"`
Services map[serviceID]service `json:"services"`
GroupToServiceIDs map[string][]serviceID `json:"group_to_service_ids"`
EntitlementToServiceIDs map[string][]serviceID `json:"entitlement_to_service_ids"`
}
)
......@@ -69,8 +85,6 @@ var (
"Client for the Federated User Credential Deployment Portal (FEUDAL)",
).Author(
"Lukas Burgey",
).Version(
"0.4.0",
)
cmdStart = app.Command("start", "Starts the client in its normal operation mode.").Default()
cmdDeregister = app.Command("deregister", "Before disabling the client: Use deregister to inform the backend that the client ceases operation.")
......@@ -90,7 +104,7 @@ func logError(err error, msg string) {
func (c *config) syncConfig() (err error) {
log.Printf("[Conf] Synchronising configuration with %v", c.Host)
log.Printf("[Conf] Synchronising configuration with %v", c.Hostname)
var (
strippedConfigBytes []byte
......@@ -101,20 +115,23 @@ func (c *config) syncConfig() (err error) {
// we inform the backend which services we provide
strippedConfigBytes, err = json.Marshal(strippedConfig{
GroupToServices: c.GroupToServices,
EntitlementToServices: c.EntitlementToServices,
Services: c.Services,
GroupToServiceIDs: c.GroupToServiceIDs,
EntitlementToServiceIDs: c.EntitlementToServiceIDs,
})
if err != nil {
err = fmt.Errorf("Error syncing config: %s", err)
return
}
// update the services tracked by the backend
req, err = http.NewRequest(
"PUT",
"https://"+c.Host+"/backend/clientapi/config",
"https://"+c.Hostname+"/backend/clientapi/config",
bytes.NewReader(strippedConfigBytes),
)
if err != nil {
err = fmt.Errorf("Error requesting remote config: %s", err)
return
}
......@@ -128,53 +145,48 @@ func (c *config) syncConfig() (err error) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
err = fmt.Errorf("Error reading remote config: %s", err)
return
}
if resp.StatusCode != 200 {
err = fmt.Errorf("Unable to sync configuration (response: %v)", resp.Status)
err = fmt.Errorf("Error with remote config request: Response was %v", resp.Status)
return
}
err = json.Unmarshal(body, &fetchedConfig)
if err != nil {
err = fmt.Errorf("Unable to parse remote configuration: %s %s", err, body)
err = fmt.Errorf("Error parsing remote configuration: %s %s", err, body)
return
}
c.RabbitMQConfig = fetchedConfig.RabbitMQConfig
c.Site = fetchedConfig.Site
log.Printf("[Conf] Synchronised configuration with %v", c.Host)
// initialize the task queues
c.NewTasks = make(chan task)
c.DoneTasks = make(chan taskReply)
c.Fetch = make(chan struct{})
c.Error = make(chan error)
log.Printf("[Conf] Synchronised configuration with %v", c.Hostname)
return
}
func getConfig(configFile string) (c config, err error) {
c.Log("Reading config file %s", configFile)
bs, err := ioutil.ReadFile(configFile)
if err != nil {
c.Log("Error reading config file: %s", err)
return
}
err = json.Unmarshal(bs, &c)
if err != nil {
c.Log("Error parsing config file: %s", err)
return
}
func (c *config) validateConfig() (err error) {
// check the config values
if c.Host == "" {
log.Fatalf("[Conf] No 'host' in config")
if c.Hostname == "" {
return fmt.Errorf("No 'hostname' in config")
}
if c.Username == "" {
log.Fatalf("[Conf] No 'user' in config")
return fmt.Errorf("No 'username' in config")
}
if c.Password == "" {
log.Fatalf("[Conf] No 'password' in config")
return fmt.Errorf("No 'password' in config")
}
// try to parse duration, otherwise use default
if c.FetchIntervalString == "" {
c.FetchInterval = defaultFetchInterval
log.Printf("[Conf] Using default fetch_interval of %v", c.FetchInterval)
......@@ -203,12 +215,68 @@ func getConfig(configFile string) (c config, err error) {
}
}
c.Log("Groups: %s", c.GroupToServices)
c.Log("Entitlements: %s", c.EntitlementToServices)
// check that the services names are unique
var exists bool
serviceNames := make(map[string]struct{}, len(c.Services))
for _, service := range c.Services {
if _, exists = serviceNames[service.Name]; !exists {
serviceNames[service.Name] = struct{}{}
} else {
return fmt.Errorf("Service name is ambiguous: %s", service.Name)
}
}
// initialize the task queues
c.NewTasks = make(chan task)
c.DoneTasks = make(chan taskReply)
// check if referenced service ids exist
for _, sids := range c.GroupToServiceIDs {
for _, sid := range sids {
if _, exists = c.Services[sid]; !exists {
return fmt.Errorf("GroupToServiceIDs: service ID '%s' does not exist", sid)
}
}
}
for _, sids := range c.EntitlementToServiceIDs {
for _, sid := range sids {
if _, exists = c.Services[sid]; !exists {
return fmt.Errorf("EntitlementToServiceIDs: service ID '%s' does not exist", sid)
}
}
}
return
}
func getConfig(configFile string) (c config, err error) {
c.Log("Reading config file %s", configFile)
configBytes, err := ioutil.ReadFile(configFile)
if err != nil {
return
}
err = json.Unmarshal(configBytes, &c)
if err != nil {
return
}
err = c.validateConfig()
if err != nil {
err = fmt.Errorf("Error validating config:\n\t%s", err)
return
}
// strip the group authority from entitlement names
nameExtractor := regexp.MustCompile("^(.*?)#")
for entName, entServices := range c.EntitlementToServiceIDs {
match := nameExtractor.FindStringSubmatch(entName)
if len(match) == 2 {
delete(c.EntitlementToServiceIDs, entName)
c.EntitlementToServiceIDs[match[1]] = entServices
}
}
c.Log("Services: %s", c.Services)
c.Log("Groups: %s", c.GroupToServiceIDs)
c.Log("Entitlements: %s", c.EntitlementToServiceIDs)
return
}
......@@ -223,7 +291,7 @@ func (c *config) deregister() {
req, err = http.NewRequest(
"PUT",
"https://"+c.Host+"/backend/clientapi/deregister",
"https://"+c.Hostname+"/backend/clientapi/deregister",
nil,
)
if err != nil {
......@@ -244,27 +312,31 @@ func (c *config) deregister() {
}
func (c *config) start() {
if len(c.EntitlementToServices) == 0 && len(c.GroupToServices) == 0 {
if len(c.EntitlementToServiceIDs) == 0 && len(c.GroupToServiceIDs) == 0 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
// start task handler and responder
go c.taskFetcher()
go c.taskHandler()
go c.taskResponder()
consumer := c.consumer()
defer consumer.close()
consumer.startConsuming()
var err error
err = consumer.startConsuming()
// start the fetcher after the consuming starts
// -> we miss nothing
go c.taskFetcher()
if err == nil {
// fetch (after the connection is opened
c.Fetch <- struct{}{}
// wait until an error occurs, log it and crash
err = <-c.Error
}
// run till killed
forever := make(chan bool)
<-forever
log.Fatalf("Fatal: %s", err)
}
func main() {
......@@ -290,7 +362,7 @@ func main() {
// read the config file
c, err := getConfig(*configFile)
if err != nil {
log.Fatalf("[Exit] No valid config. Exiting")
log.Fatalf("[Conf] %s", err)
}
switch cmd {
......
......@@ -82,48 +82,52 @@ func (te taskReply) Log(formatString string, params ...interface{}) {
log.Printf("%s "+formatString, append([]interface{}{te}, params...)...)
}
// TODO move to main.go
func (c *config) getServices(sids []serviceID) (services []service, err error) {
var ok bool
services = make([]service, len(sids))
for i, sid := range sids {
if services[i], ok = c.Services[sid]; !ok {
services = nil
err = fmt.Errorf("Service with service ID '%s' does no exist", sid)
return
}
}
return
}
// taskServices finds the services which need to be deployed for the given task
func (c *config) taskServices(t task) (services []service, err error) {
// Option 1: VODeployment
if t.ResourceType == "VODeployment" && t.VO != (vo{}) {
var ok bool
var sids []serviceID
if t.VO.ResourceType == "Group" {
var ok bool
services, ok = c.GroupToServices[t.VO.Name]
sids, ok = c.GroupToServiceIDs[t.VO.Name]
if !ok {
err = fmt.Errorf(
"%s: Group %s has no mapped services",
t,
t.VO.Name,
)
err = fmt.Errorf("%s: Group %s does not exist", t, t.VO.Name)
return
}
} else if t.VO.ResourceType == "Entitlement" {
var ok bool
services, ok = c.EntitlementToServices[t.VO.Name]
sids, ok = c.EntitlementToServiceIDs[t.VO.Name]
if !ok {
err = fmt.Errorf(
"%s: Entitlement %s has no mapped services",
t,
t.VO.Name,
)
err = fmt.Errorf("%s: Entitlemnet %s does not exist", t, t.VO.Name)
return
}
}
} else if t.ResourceType == "ServiceDeployment" && t.Service != (service{}) {
// we should hopefully only find one matching service
// See Issue #3
for _, groupServices := range c.GroupToServices {
for _, service := range groupServices {
if service.Name == t.Service.Name {
services = append(services, service)
}
}
services, err = c.getServices(sids)
if err != nil {
return
}
for _, entitlementServices := range c.EntitlementToServices {
for _, service := range entitlementServices {
if service.Name == t.Service.Name {
services = append(services, service)
}
} else if t.ResourceType == "ServiceDeployment" && t.Service != (service{}) {
// TODO assure this
// we return the first service (as the service names are unique)
for _, s := range c.Services {
if s.Name == t.Service.Name {
services = []service{s}
return
}
}
} else {
......@@ -143,15 +147,19 @@ func (c *config) taskFetcher() {
// start ticker
ticker := time.NewTicker(c.FetchInterval)
for {
go func() {
if err := c.fetchTasks(); err != nil {
log.Printf("[Fetch] error fetching: %s", err)
}
}()
fetchWrapper := func() {
if err := c.fetchTasks(); err != nil {
log.Printf("[Fetch] error fetching: %s", err)
}
}
// wait for a tick
<-ticker.C
for {
select {
case <-ticker.C:
fetchWrapper()
case <-c.Fetch:
fetchWrapper()
}
}
}
......@@ -204,13 +212,13 @@ func (c *config) taskResponder() {
}
func (c *config) fetchTasks() (err error) {
if len(c.EntitlementToServices) == 0 && len(c.GroupToServices) == 0 {
if len(c.EntitlementToServiceIDs) == 0 && len(c.GroupToServiceIDs) == 0 {
log.Printf("[HTTP] Not fetching because the are no services to fetch")
return
}
// construct a request
uri := "https://" + c.Host + "/backend/clientapi/deployments"
uri := "https://" + c.Hostname + "/backend/clientapi/deployments"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return
......@@ -218,10 +226,10 @@ func (c *config) fetchTasks() (err error) {
// add query params for the services
q := req.URL.Query()
for groupName := range c.GroupToServices {
for groupName := range c.GroupToServiceIDs {
q.Add("g", groupName)
}
for entitlementName := range c.EntitlementToServices {
for entitlementName := range c.EntitlementToServiceIDs {
q.Add("e", entitlementName)
}
req.URL.RawQuery = q.Encode()
......@@ -420,7 +428,7 @@ func (c *config) respondToTask(te taskReply) (err error) {
log.Printf("Task Response:\n%s", taskResponse)
}
url := fmt.Sprintf("https://%s/backend/clientapi/response", c.Host)
url := fmt.Sprintf("https://%s/backend/clientapi/response", c.Hostname)
req, err := http.NewRequest("POST", url, bytes.NewReader(taskResponse))
if err != nil {
return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment