task.go 9.71 KB
Newer Older
Lukas Burgey's avatar
Lukas Burgey committed
1
2
3
package main

import (
4
	"bytes"
Lukas Burgey's avatar
Lukas Burgey committed
5
6
	"encoding/json"
	"fmt"
Lukas Burgey's avatar
Lukas Burgey committed
7
	"io"
Lukas Burgey's avatar
Lukas Burgey committed
8
9
10
11
	"io/ioutil"
	"log"
	"net/http"
	"os/exec"
Lukas Burgey's avatar
Lukas Burgey committed
12
	"strings"
Lukas Burgey's avatar
Lukas Burgey committed
13
14
	"time"

Lukas Burgey's avatar
Lukas Burgey committed
15
	s "git.scc.kit.edu/feudal/feudalScripts"
Lukas Burgey's avatar
Lukas Burgey committed
16
17
18
19
)

type (
	service struct {
Lukas Burgey's avatar
Lukas Burgey committed
20
21
22
		Name        string `json:"name"`
		Command     string `json:"command"`
		Description string `json:"description"`
Lukas Burgey's avatar
Lukas Burgey committed
23
24
	}

25
26
27
28
	vo struct {
		ID           int    `json:"id"`
		Name         string `json:"name"`
		ResourceType string `json:"resourcetype"`
Lukas Burgey's avatar
Lukas Burgey committed
29
30
	}

31
	// task describes a deployment action
Lukas Burgey's avatar
Lukas Burgey committed
32
	task struct {
Lukas Burgey's avatar
Lukas Burgey committed
33
34
35
36
37
		ID            int                       `json:"id"`
		StateTarget   s.State                   `json:"state_target"`
		User          s.User                    `json:"user"`
		Credentials   map[string][]s.Credential `json:"credentials"`
		Questionnaire map[string]string         `json:"questionnaire"`
38
		VO            vo                        `json:"vo"`
Lukas Burgey's avatar
Lukas Burgey committed
39

40
		// Service may be overwritten by scheduleTask
Lukas Burgey's avatar
Lukas Burgey committed
41
42
43
		Service service `json:"service,omitempty"`

		// Exchange and RoutingKey are inserted when we receive the task via RabbitMQ
Lukas Burgey's avatar
Lukas Burgey committed
44
45
		Exchange   string
		RoutingKey string
Lukas Burgey's avatar
Lukas Burgey committed
46
47
	}

48
49
	// taskReply is sent back to the backend to inform it about executed deployment actions
	taskReply struct {
Lukas Burgey's avatar
Lukas Burgey committed
50
		// ID of the according task (at the backend: DeploymentState)
51
52
		ID     int      `json:"id"`
		Output s.Output `json:"output"`
Lukas Burgey's avatar
Lukas Burgey committed
53

54
		// Service is the service for which we performed the deploy action
Lukas Burgey's avatar
Lukas Burgey committed
55
		Service service `json:"service"`
Lukas Burgey's avatar
Lukas Burgey committed
56
57
	}

Lukas Burgey's avatar
Lukas Burgey committed
58
59
60
61
	// ackResponse response of the backend for our response
	ackResponse struct {
		Error string `json:"error"`
	}
Lukas Burgey's avatar
Lukas Burgey committed
62
63
64
65
66
67
68
)

func (s service) String() string {
	return s.Name
}

func (t task) String() string {
Lukas Burgey's avatar
Lukas Burgey committed
69
70
71
72
	return fmt.Sprintf("[Task:%v:%v]", t.ID, t.Service.Name)
}

func (t task) Log(formatString string, params ...interface{}) {
Lukas Burgey's avatar
Lukas Burgey committed
73
	log.Printf("%s "+formatString, append([]interface{}{t}, params...)...)
Lukas Burgey's avatar
Lukas Burgey committed
74
75
}

76
func (te taskReply) String() string {
Lukas Burgey's avatar
Lukas Burgey committed
77
78
79
	return fmt.Sprintf("[Task:%v:%v]", te.ID, te.Service.Name)
}

80
func (te taskReply) Log(formatString string, params ...interface{}) {
Lukas Burgey's avatar
Lukas Burgey committed
81
	log.Printf("%s "+formatString, append([]interface{}{te}, params...)...)
Lukas Burgey's avatar
Lukas Burgey committed
82
83
84
}

func (c *config) taskServices(t task) (services []service, err error) {
Lukas Burgey's avatar
Lukas Burgey committed
85
86
87
	// hacky: we need to determine exchange and routing key when we fetched manually from the rest
	// interface
	if t.Exchange == "" || t.RoutingKey == "" {
88
		if t.VO.ResourceType == "Group" {
Lukas Burgey's avatar
Lukas Burgey committed
89
			t.Exchange = "groups"
90
91
92
93
			t.RoutingKey = t.VO.Name
		} else if t.VO.ResourceType == "Entitlement" {
			t.Exchange = "entitlements"
			t.RoutingKey = t.VO.Name
Lukas Burgey's avatar
Lukas Burgey committed
94
95
96
		}
	}

Lukas Burgey's avatar
Lukas Burgey committed
97
98
99
100
101
102
	switch t.Exchange {
	case "groups":
		var ok bool
		services, ok = c.GroupToServices[t.RoutingKey]
		if !ok {
			err = fmt.Errorf(
Lukas Burgey's avatar
Lukas Burgey committed
103
				"%s: Group %s has no mapped services",
Lukas Burgey's avatar
Lukas Burgey committed
104
				t,
Lukas Burgey's avatar
Lukas Burgey committed
105
				t.RoutingKey,
Lukas Burgey's avatar
Lukas Burgey committed
106
107
108
109
			)
			return
		}

110
111
112
113
114
115
116
117
118
119
120
	case "entitlements":
		var ok bool
		services, ok = c.EntitlementToServices[t.RoutingKey]
		if !ok {
			err = fmt.Errorf(
				"%s: Entitlement %s has no mapped services",
				t,
				t.RoutingKey,
			)
			return
		}
Lukas Burgey's avatar
Lukas Burgey committed
121
122
123
	case "sites":
		// TODO implement
	}
Lukas Burgey's avatar
Lukas Burgey committed
124
125

	if len(services) == 0 {
Lukas Burgey's avatar
Lukas Burgey committed
126
		t.Log("Unable to determine services for this task")
Lukas Burgey's avatar
Lukas Burgey committed
127
	}
Lukas Burgey's avatar
Lukas Burgey committed
128
	return
Lukas Burgey's avatar
Lukas Burgey committed
129
130
}

131
132
133
134
// fetches tasks (which rabbitmq missed) manually
func (c *config) taskFetcher() {
	// start ticker
	ticker := time.NewTicker(c.FetchInterval)
Lukas Burgey's avatar
Lukas Burgey committed
135

Lukas Burgey's avatar
Lukas Burgey committed
136
137
138
139
140
141
	for {
		go func() {
			if err := c.fetchTasks(); err != nil {
				log.Printf("[Fetch] error fetching: %s", err)
			}
		}()
Lukas Burgey's avatar
Lukas Burgey committed
142

Lukas Burgey's avatar
Lukas Burgey committed
143
144
145
		// wait for a tick
		<-ticker.C
	}
Lukas Burgey's avatar
Lukas Burgey committed
146
147
}

148
// handles tasks in c.NewTasks
Lukas Burgey's avatar
Lukas Burgey committed
149
150
func (c *config) taskHandler() {
	for newTask := range c.NewTasks {
Lukas Burgey's avatar
Lukas Burgey committed
151
		handleWrapper := func(t task) {
Lukas Burgey's avatar
Lukas Burgey committed
152
			if err := c.handleTask(t); err != nil {
Lukas Burgey's avatar
Lukas Burgey committed
153
154
				t.Log("Error handling task: %s", err)

155
				c.DoneTasks <- taskReply{
Lukas Burgey's avatar
Lukas Burgey committed
156
157
					ID:      t.ID,
					Service: t.Service,
158
159
					Output: s.Output{
						State: s.Failed,
Lukas Burgey's avatar
Lukas Burgey committed
160
161
162
163
						// TODO maybe another message for the user
						Msg: fmt.Sprint(err),
					},
				}
Lukas Burgey's avatar
Lukas Burgey committed
164
			}
Lukas Burgey's avatar
Lukas Burgey committed
165
166
167
168
169
170
171
172
		}

		if *sequentialExecution {
			handleWrapper(newTask)
		} else {
			// handle tasks asynchronously
			go handleWrapper(newTask)
		}
Lukas Burgey's avatar
Lukas Burgey committed
173
174
175
	}
}

Lukas Burgey's avatar
Lukas Burgey committed
176
// responds to the backend concering tasks in c.DoneTasks
177
func (c *config) taskResponder() {
Lukas Burgey's avatar
Lukas Burgey committed
178
179
	for doneTask := range c.DoneTasks {
		// ack tasks asynchronously
180
		go func(te taskReply) {
181
			err := c.respondToTask(te)
Lukas Burgey's avatar
Lukas Burgey committed
182
			if err != nil {
Lukas Burgey's avatar
Lukas Burgey committed
183
				te.Log("ACK-ERROR: %s", err)
184
185

				// reschedule failed responses
186
				go func(te taskReply) {
187
188
189
					time.Sleep(time.Minute)
					c.DoneTasks <- te
				}(te)
Lukas Burgey's avatar
Lukas Burgey committed
190
191
192
193
194
195
				return
			}
		}(doneTask)
	}
}

196
// IMPLEMENTATIONS
Lukas Burgey's avatar
Lukas Burgey committed
197
func (c *config) fetchTasks() (err error) {
198
	if len(c.EntitlementToServices) == 0 && len(c.GroupToServices) == 0 {
199
		log.Printf("[HTTP] Not fetching because the are no services to fetch")
Lukas Burgey's avatar
Lukas Burgey committed
200
201
202
203
204
205
206
207
208
209
210
211
		return
	}

	// construct a request
	uri := "https://" + c.Host + "/backend/clientapi/deployments"
	req, err := http.NewRequest("GET", uri, nil)
	if err != nil {
		return
	}

	// add query params for the services
	q := req.URL.Query()
Lukas Burgey's avatar
Lukas Burgey committed
212
213
214
	for groupName := range c.GroupToServices {
		q.Add("g", groupName)
	}
215
216
217
	for entitlementName := range c.EntitlementToServices {
		q.Add("e", entitlementName)
	}
Lukas Burgey's avatar
Lukas Burgey committed
218
219
220
221
222
223
224
225
226
227
228
	req.URL.RawQuery = q.Encode()

	req.SetBasicAuth(c.Username, c.Password)

	// execute the request
	resp, err := client.Do(req)
	if err != nil {
		return
	}
	defer resp.Body.Close()

Lukas Burgey's avatar
Lukas Burgey committed
229
	if resp.StatusCode != 200 {
230
		err = fmt.Errorf("[HTTP] Unable to fetch tasks (response: %v)", resp.Status)
Lukas Burgey's avatar
Lukas Burgey committed
231
232
233
		return
	}

Lukas Burgey's avatar
Lukas Burgey committed
234
	var newTasks []task
Lukas Burgey's avatar
Lukas Burgey committed
235
236
237
238
	body, err := ioutil.ReadAll(resp.Body)

	err = json.Unmarshal(body, &newTasks)
	if err != nil {
239
		err = fmt.Errorf("[HTTP] Error unmarshaling: %s\n%s", err, body)
Lukas Burgey's avatar
Lukas Burgey committed
240
241
		return
	}
Lukas Burgey's avatar
Lukas Burgey committed
242

Lukas Burgey's avatar
Lukas Burgey committed
243
244
245
	if *backendDebugging {
		indented := new(bytes.Buffer)
		if err := json.Indent(indented, body, "", "  "); err != nil {
246
			log.Printf("[HTTP] Fetched (unable to indent: %v):\n%s", err, body)
Lukas Burgey's avatar
Lukas Burgey committed
247
		} else {
248
			log.Printf("[HTTP] Fetched:\n%s", indented)
Lukas Burgey's avatar
Lukas Burgey committed
249
250
251
		}
	}

Lukas Burgey's avatar
Lukas Burgey committed
252
	for _, task := range newTasks {
Lukas Burgey's avatar
Lukas Burgey committed
253
254
		err = c.scheduleTask(task)
		if err != nil {
255
			log.Printf("[HTTP] Scheduling: %v", err)
Lukas Burgey's avatar
Lukas Burgey committed
256
257
258

			// we execute all tasks even when one fails
			err = nil
Lukas Burgey's avatar
Lukas Burgey committed
259
		}
Lukas Burgey's avatar
Lukas Burgey committed
260
261
262
263
264
	}
	return
}

// scheduleTask puts tasks in the newTasks channel
Lukas Burgey's avatar
Lukas Burgey committed
265
func (c *config) scheduleTask(t task) (err error) {
Lukas Burgey's avatar
Lukas Burgey committed
266
267
	// determine the services of the task
	var services []service
Lukas Burgey's avatar
Lukas Burgey committed
268
	services, err = c.taskServices(t)
Lukas Burgey's avatar
Lukas Burgey committed
269
270
271
272
273
274
	if err != nil {
		return
	}

	// schedule one task per service of the task
	for _, svc := range services {
Lukas Burgey's avatar
Lukas Burgey committed
275
276
277
		var serviceTask task
		serviceTask = t
		serviceTask.Service = svc
Lukas Burgey's avatar
Lukas Burgey committed
278

Lukas Burgey's avatar
Lukas Burgey committed
279
		serviceTask.Log("Scheduling execution for %s", svc.Name)
Lukas Burgey's avatar
Lukas Burgey committed
280

Lukas Burgey's avatar
Lukas Burgey committed
281
		// put the in the task queue
Lukas Burgey's avatar
Lukas Burgey committed
282
		c.NewTasks <- serviceTask
Lukas Burgey's avatar
Lukas Burgey committed
283
284
285
	}
	return
}
Lukas Burgey's avatar
Lukas Burgey committed
286

Lukas Burgey's avatar
Lukas Burgey committed
287
func (c *config) handleTask(t task) (err error) {
Lukas Burgey's avatar
Lukas Burgey committed
288

289
	var output s.Output
290
291

	// encode input as json
292
	input := s.Input{
Lukas Burgey's avatar
Lukas Burgey committed
293
294
295
		StateTarget:   t.StateTarget,
		User:          t.User,
		Questionnaire: t.Questionnaire,
Lukas Burgey's avatar
Lukas Burgey committed
296
		Credentials:   t.Credentials,
297
	}
298

299
300
301
302
303
	iBytes, err := input.Marshal()
	if err != nil {
		return
	}

Lukas Burgey's avatar
Lukas Burgey committed
304
305
306
307
308
	// TODO execute in parallel
	var (
		stdin          io.WriteCloser
		stdout, stderr io.ReadCloser
	)
Lukas Burgey's avatar
Lukas Burgey committed
309

Lukas Burgey's avatar
Lukas Burgey committed
310
311
	// execute the script
	if *scriptDebugging {
Lukas Burgey's avatar
Lukas Burgey committed
312
313
		t.Log("Executing: '%s'", t.Service.Command)
		t.Log("Input: %s", input)
Lukas Burgey's avatar
Lukas Burgey committed
314
	}
315

Lukas Burgey's avatar
Lukas Burgey committed
316
317
	commandParts := strings.Split(t.Service.Command, " ")
	cmd := exec.Command(commandParts[0], commandParts[1:]...)
Lukas Burgey's avatar
Lukas Burgey committed
318
319
320
321
322
323
324
325
326
327
328
329
	stdin, err = cmd.StdinPipe()
	if err != nil {
		return
	}
	stdout, err = cmd.StdoutPipe()
	if err != nil {
		return
	}
	stderr, err = cmd.StderrPipe()
	if err != nil {
		return
	}
330

Lukas Burgey's avatar
Lukas Burgey committed
331
332
333
334
335
336
	err = cmd.Start()
	if err != nil {
		return
	}
	stdin.Write(iBytes)
	stdin.Close()
337

Lukas Burgey's avatar
Lukas Burgey committed
338
339
340
341
342
343
	// decode json output
	var outputBytes, logOutputBytes []byte
	outputBytes, err = ioutil.ReadAll(stdout)
	if err != nil {
		return
	}
344

Lukas Burgey's avatar
Lukas Burgey committed
345
346
347
348
349
	logOutputBytes, err = ioutil.ReadAll(stderr)
	if err != nil {
		return
	}
	if *scriptDebugging {
Lukas Burgey's avatar
Lukas Burgey committed
350
351
352
353
		if len(logOutputBytes) > 0 {
			t.Log("Logs:\n%s", logOutputBytes)
			t.Log("End of Logs")
		}
Lukas Burgey's avatar
Lukas Burgey committed
354
	}
Lukas Burgey's avatar
Lukas Burgey committed
355

Lukas Burgey's avatar
Lukas Burgey committed
356
357
358
359
	err = cmd.Wait()
	if err != nil {
		return
	}
Lukas Burgey's avatar
Lukas Burgey committed
360
361
362
363
364
365
366
367
368
	if *scriptDebugging {
		var indented = new(bytes.Buffer)
		err = json.Indent(indented, outputBytes, "  ", "  ")
		if err != nil {
			t.Log("Output (unable to indent: %s):\n%s", err, outputBytes)
		} else {
			t.Log("Output:\n%s", indented.String())
		}
	}
Lukas Burgey's avatar
Lukas Burgey committed
369

Lukas Burgey's avatar
Lukas Burgey committed
370
371
	err = json.Unmarshal(outputBytes, &output)
	if err != nil {
Lukas Burgey's avatar
Lukas Burgey committed
372
373
374
		if len(outputBytes) > 0 {
			t.Log("Raw output: %s", outputBytes)
		}
Lukas Burgey's avatar
Lukas Burgey committed
375
376
		return
	}
377

378
379
380
381
382
383
384
385
386
387
388
389
	// if the script does not provide any credential specific states we expect all
	// credentials to have the same state as the whole execution (output.State)
	if output.UserCredentialStates == nil {
		var userCredentialStates = make(s.UserCredentialStates)
		for credType, credList := range t.Credentials {
			for _, credential := range credList {
				if stateMap, ok := userCredentialStates[credType]; !ok {
					userCredentialStates[credType] = map[string]s.State{
						credential.Name: output.State,
					}
				} else {
					stateMap[credential.Name] = output.State
390
391
392
				}
			}
		}
393
		output.UserCredentialStates = userCredentialStates
394
395
396
	}

	c.DoneTasks <- taskReply{
397
398
399
		ID:      t.ID,
		Service: t.Service,
		Output:  output,
Lukas Burgey's avatar
Lukas Burgey committed
400
	}
401
	return
Lukas Burgey's avatar
Lukas Burgey committed
402
}
Lukas Burgey's avatar
Lukas Burgey committed
403

404
func (c *config) respondToTask(te taskReply) (err error) {
Lukas Burgey's avatar
Lukas Burgey committed
405
	taskResponse, err := json.MarshalIndent(te, "", "    ")
Lukas Burgey's avatar
Lukas Burgey committed
406
407
408
409
	if err != nil {
		return
	}

410
	if *backendDebugging {
Lukas Burgey's avatar
Lukas Burgey committed
411
		log.Printf("Task Response:\n%s", taskResponse)
412
413
	}

Lukas Burgey's avatar
Lukas Burgey committed
414
	url := fmt.Sprintf("https://%s/backend/clientapi/response", c.Host)
Lukas Burgey's avatar
Lukas Burgey committed
415
	req, err := http.NewRequest("POST", url, bytes.NewReader(taskResponse))
Lukas Burgey's avatar
Lukas Burgey committed
416
417
418
419
420
421
422
423
424
425
426
427
428
429
	if err != nil {
		return
	}
	req.SetBasicAuth(c.Username, c.Password)
	req.Header.Set("Content-Type", "application/json")

	// execute request
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return
	}
	defer resp.Body.Close()

	if resp.StatusCode == 200 {
Lukas Burgey's avatar
Lukas Burgey committed
430
		te.Log("Successful response")
Lukas Burgey's avatar
Lukas Burgey committed
431
	} else {
Lukas Burgey's avatar
Lukas Burgey committed
432
		te.Log("Response Status: %v", resp.StatusCode)
Lukas Burgey's avatar
Lukas Burgey committed
433
434
435
436
437
438
439
440
441
442

		var (
			ackResponseBytes []byte
			ackResponse      ackResponse
		)
		ackResponseBytes, err = ioutil.ReadAll(resp.Body)
		if err != nil {
			return
		}
		err = json.Unmarshal(ackResponseBytes, &ackResponse)
Lukas Burgey's avatar
Lukas Burgey committed
443
444
445
		if err != nil {
			return
		}
Lukas Burgey's avatar
Lukas Burgey committed
446
		te.Log("Response to our response: '%s'", ackResponse.Error)
Lukas Burgey's avatar
Lukas Burgey committed
447
	}
Lukas Burgey's avatar
Lukas Burgey committed
448
449
	return
}