Commit 32c650e3 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Switch to using VOs (groups / entitlements)

Also deprecates using deployment per service
parent f07d1048
......@@ -37,26 +37,24 @@ func (c consumer) Log(formatString string, params ...interface{}) {
}
func (c *config) consumer() (cons *consumer) {
var serviceRoutingKeys, groupRoutingKeys []string
for _, service := range c.Services {
serviceRoutingKeys = append(serviceRoutingKeys, service.Name)
}
for groupName, services := range c.GroupToServices {
var groupRoutingKeys []string
for groupName := range c.GroupToServices {
groupRoutingKeys = append(groupRoutingKeys, fmt.Sprintf("%s", groupName))
}
for _, service := range services {
serviceRoutingKeys = append(serviceRoutingKeys, service.Name)
}
var entitlementRoutingKeys []string
for entitlementName := range c.EntitlementToServices {
entitlementRoutingKeys = append(entitlementRoutingKeys, fmt.Sprintf("%s", entitlementName))
}
cons = &consumer{
uri: fmt.Sprintf("amqps://%s:%s@%s", c.Username, c.Password, c.Host),
exchanges: []exchange{
exchange{
"services", "topic", serviceRoutingKeys,
"groups", "topic", groupRoutingKeys,
},
exchange{
"groups", "topic", groupRoutingKeys,
"entitlements", "topic", entitlementRoutingKeys,
},
},
consumer: c.consume,
......
......@@ -29,16 +29,19 @@ type (
Username string `json:"username"`
Password string `json:"password"`
// Services provided at the site of this client
// this is for deployment per _service_
Services []service `json:"services"`
// GroupToServices maps a group name to services provided for this group
// this is for deployment per _group_
GroupToServices map[string]([]service) `json:"group_to_services"`
FetchIntervalString string `json:"fetch_interval"` // string parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"` // string parsed by time.ParseDuration
// EntitlementToServices maps a entitlement to services provided for users with this
// entitlement
EntitlementToServices map[string]([]service) `json:"entitlement_to_services"`
// FetchIntervalString gets parsed by time.ParseDuration
FetchIntervalString string `json:"fetch_interval"`
// ReconnectTimeout gets parsed by time.ParseDuration
ReconnectTimeoutString string `json:"reconnect_timeout"`
NewTasks chan task
DoneTasks chan taskReply
FetchInterval time.Duration
......@@ -49,8 +52,8 @@ type (
// strippedConfig is sent to the backend on startup
strippedConfig struct {
Services []service `json:"services"`
GroupToServices map[string]([]service) `json:"group_to_services"`
EntitlementToServices map[string]([]service) `json:"entitlement_to_services"`
}
)
......@@ -96,8 +99,8 @@ func (c *config) syncConfig() (err error) {
// we inform the backend which services we provide
strippedConfigBytes, err = json.Marshal(strippedConfig{
Services: c.Services,
GroupToServices: c.GroupToServices,
EntitlementToServices: c.EntitlementToServices,
})
if err != nil {
return
......@@ -205,8 +208,8 @@ func getConfig(configFile string) (c config, err error) {
return
}
c.Log("Services: %s", c.Services)
c.Log("Groups: %s", c.GroupToServices)
c.Log("Entitlements: %s", c.EntitlementToServices)
// initialize the task queues
c.NewTasks = make(chan task)
......@@ -246,7 +249,7 @@ func main() {
log.Fatalf("[Exit] No valid config. Exiting")
}
if len(c.Services) == 0 && len(c.GroupToServices) == 0 {
if len(c.EntitlementToServices) == 0 && len(c.GroupToServices) == 0 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
......
......@@ -22,8 +22,10 @@ type (
Description string `json:"description"`
}
group struct {
Name string `json:"name"`
vo struct {
ID int `json:"id"`
Name string `json:"name"`
ResourceType string `json:"resourcetype"`
}
// task describes a deployment action
......@@ -33,7 +35,7 @@ type (
User s.User `json:"user"`
Credentials map[string][]s.Credential `json:"credentials"`
Questionnaire map[string]string `json:"questionnaire"`
Group group `json:"group"`
VO vo `json:"vo"`
// Service may be overwritten by scheduleTask
Service service `json:"service,omitempty"`
......@@ -83,33 +85,16 @@ func (c *config) taskServices(t task) (services []service, err error) {
// hacky: we need to determine exchange and routing key when we fetched manually from the rest
// interface
if t.Exchange == "" || t.RoutingKey == "" {
if t.Service != (service{}) {
t.Exchange = "services"
t.RoutingKey = t.Service.Name
} else if t.Group != (group{}) {
if t.VO.ResourceType == "Group" {
t.Exchange = "groups"
t.RoutingKey = t.Group.Name
t.RoutingKey = t.VO.Name
} else if t.VO.ResourceType == "Entitlement" {
t.Exchange = "entitlements"
t.RoutingKey = t.VO.Name
}
}
switch t.Exchange {
case "services":
for _, svc := range c.Services {
if svc.Name == t.RoutingKey {
services = []service{svc}
return
}
}
for _, groupServices := range c.GroupToServices {
for _, groupService := range groupServices {
if groupService.Name == t.RoutingKey {
services = []service{groupService}
return
}
}
}
case "groups":
var ok bool
services, ok = c.GroupToServices[t.RoutingKey]
......@@ -122,6 +107,17 @@ func (c *config) taskServices(t task) (services []service, err error) {
return
}
case "entitlements":
var ok bool
services, ok = c.EntitlementToServices[t.RoutingKey]
if !ok {
err = fmt.Errorf(
"%s: Entitlement %s has no mapped services",
t,
t.RoutingKey,
)
return
}
case "sites":
// TODO implement
}
......@@ -199,7 +195,7 @@ func (c *config) taskResponder() {
// IMPLEMENTATIONS
func (c *config) fetchTasks() (err error) {
if len(c.Services) == 0 && len(c.GroupToServices) == 0 {
if len(c.EntitlementToServices) == 0 && len(c.GroupToServices) == 0 {
log.Printf("[Fetch] Not fetching because the are no services to fetch")
return
}
......@@ -213,12 +209,12 @@ func (c *config) fetchTasks() (err error) {
// add query params for the services
q := req.URL.Query()
for _, service := range c.Services {
q.Add("s", service.Name)
}
for groupName := range c.GroupToServices {
q.Add("g", groupName)
}
for entitlementName := range c.EntitlementToServices {
q.Add("e", entitlementName)
}
req.URL.RawQuery = q.Encode()
req.SetBasicAuth(c.Username, c.Password)
......@@ -238,6 +234,10 @@ func (c *config) fetchTasks() (err error) {
var newTasks []task
body, err := ioutil.ReadAll(resp.Body)
var indented bytes.Buffer
json.Indent(&indented, body, "", " ")
log.Printf("%s", indented.Bytes())
err = json.Unmarshal(body, &newTasks)
if err != nil {
err = fmt.Errorf("Error unmarshaling: %s\n%s", err, body)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment