script.go 8.15 KB
Newer Older
Lukas Burgey's avatar
Lukas Burgey committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package script

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"net/http"
	"os/exec"
	"strings"
	"time"

	"git.scc.kit.edu/feudal/feudalClient/config"
16
	deps "git.scc.kit.edu/feudal/feudalClient/deployments"
lukas.burgey's avatar
lukas.burgey committed
17
	"git.scc.kit.edu/feudal/feudalClient/indent"
lukas.burgey's avatar
lukas.burgey committed
18
	scripts "git.scc.kit.edu/feudal/feudalScripts/v3"
Lukas Burgey's avatar
Lukas Burgey committed
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
)

type (
	// Sink implements sink.Sink
	Sink struct {
		Config    *config.Config
		Scheduled chan deps.Dep

		// Replies for the backend
		Replies chan deps.Reply
	}

	// ackResponse response of the backend for our response
	ackResponse struct {
		Error string `json:"error"`
	}
)

37
38
39
40
var (
	httpClient = &http.Client{}
)

41
42
// completeReply fills in the UserCredentialStates if they are not provided in the script output
func completeReply(dep deps.Dep, reply deps.Reply) deps.Reply {
lukas.burgey's avatar
lukas.burgey committed
43
	if reply.UserCredentialStates == nil && dep.User.Credentials != nil {
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
		var userCredentialStates = make(scripts.UserCredentialStates)
		for credType, credList := range dep.User.Credentials {
			for _, credential := range credList {
				if stateMap, ok := userCredentialStates[credType]; !ok {
					userCredentialStates[credType] = map[string]scripts.State{
						credential.Name: reply.State,
					}
				} else {
					stateMap[credential.Name] = reply.State
				}
			}
		}
		reply.UserCredentialStates = userCredentialStates
	}
	return reply
}

Lukas Burgey's avatar
Lukas Burgey committed
61
func (sink *Sink) depServices(dep deps.Dep) (services []config.Service, err error) {
62
63
	// find the matching service
	if dep.Service != (config.Service{}) {
Lukas Burgey's avatar
Lukas Burgey committed
64
65
66
67
68
69
70
71
		for _, s := range sink.Config.Services {
			if s.Name == dep.Service.Name {
				services = []config.Service{s}
				return
			}
		}
	}

72
	err = fmt.Errorf("Service for Dep does not exist: '%s'", dep.Service.Name)
Lukas Burgey's avatar
Lukas Burgey committed
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
	return
}

func (sink *Sink) scheduleDep(dep deps.Dep) (err error) {
	// determine the services of the deployment
	var services []config.Service
	services, err = sink.depServices(dep)
	if err != nil {
		return
	}

	// execute the deployment for all the services
	for _, svc := range services {
		dep.Service = svc

Lukas Burgey's avatar
Lukas Burgey committed
88
89
90
		if sink.Config.Debug.Scripts {
			dep.Log("Scheduling execution for %s", svc.Name)
		}
Lukas Burgey's avatar
Lukas Burgey committed
91
92
93
94
95
96
97
98
99
100
101

		// schedule the deployment
		sink.Scheduled <- dep
	}
	return
}

func (sink *Sink) handleDep(dep deps.Dep) (output scripts.Output, err error) {

	// encode input as json
	input := scripts.Input{
Lukas Burgey's avatar
Lukas Burgey committed
102
103
104
		StateTarget: dep.StateTarget,
		User:        dep.User,
		Answers:     dep.Answers,
Lukas Burgey's avatar
Lukas Burgey committed
105
106
	}

lukas.burgey's avatar
lukas.burgey committed
107
108
109
110
111
	// the userinfo is not included in the User struct anymore
	// we therefore copy it over from its new field so we don't have to change the scripts.Input
	input.User.UserInfo = dep.UserInfo

	if dep.UserInfo == nil {
112
		err = fmt.Errorf("Deployment is unfit for execution: user.userinfo is null! Script input would've been:\n%s", input)
lukas.burgey's avatar
lukas.burgey committed
113
		return
114
115
	}

Lukas Burgey's avatar
Lukas Burgey committed
116
117
118
119
120
121
122
123
124
125
126
127
128
	iBytes, err := input.Marshal()
	if err != nil {
		return
	}

	var (
		stdin          io.WriteCloser
		stdout, stderr io.ReadCloser
	)

	// execute the script
	if sink.Config.Debug.Scripts {
		dep.Log("Executing: '%s'", dep.Service.Command)
129
		dep.Log("Input: %s", indent.Indent(iBytes))
Lukas Burgey's avatar
Lukas Burgey committed
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
	}

	commandParts := strings.Split(dep.Service.Command, " ")
	cmd := exec.Command(commandParts[0], commandParts[1:]...)
	stdin, err = cmd.StdinPipe()
	if err != nil {
		return
	}
	stdout, err = cmd.StdoutPipe()
	if err != nil {
		return
	}
	stderr, err = cmd.StderrPipe()
	if err != nil {
		return
	}

	err = cmd.Start()
	if err != nil {
		return
	}
	stdin.Write(iBytes)
	stdin.Close()

lukas.burgey's avatar
lukas.burgey committed
154
155
	// read stdout / stderr
	var stdoutBytes, stderrBytes []byte
Lukas Burgey's avatar
Lukas Burgey committed
156

lukas.burgey's avatar
lukas.burgey committed
157
	stdoutBytes, err = ioutil.ReadAll(stdout)
Lukas Burgey's avatar
Lukas Burgey committed
158
159
160
	if err != nil {
		return
	}
lukas.burgey's avatar
lukas.burgey committed
161
162
163
164
165

	stderrBytes, _ = ioutil.ReadAll(stderr)
	if sink.Config.Debug.Scripts && len(stderrBytes) > 0 {
		dep.Log("Logs:\n%s", stderrBytes)
		dep.Log("End of Logs")
Lukas Burgey's avatar
Lukas Burgey committed
166
167
	}

168
169
170
171
172
173
174
175
176
177
178
	// execute the script with timeout
	done := make(chan error)
	go func() { done <- cmd.Wait() }()
	// TODO put in config
	timeoutDuration := 2 * time.Second
	timeout := time.After(timeoutDuration)

	select {
	case <-timeout:
		cmd.Process.Kill()
		err = fmt.Errorf("Execution of '%s' timed out after %s", dep.Service.Command, timeoutDuration)
Lukas Burgey's avatar
Lukas Burgey committed
179
		return
180
181
182
183
	case err = <-done:
		if err != nil {
			return
		}
Lukas Burgey's avatar
Lukas Burgey committed
184
	}
185

Lukas Burgey's avatar
Lukas Burgey committed
186
	if sink.Config.Debug.Scripts {
lukas.burgey's avatar
lukas.burgey committed
187
		dep.Log("Output: %s", indent.Indent(stdoutBytes))
Lukas Burgey's avatar
Lukas Burgey committed
188
189
	}

lukas.burgey's avatar
lukas.burgey committed
190
191
192
193
194
195
	err = scripts.UnmarshalOutput(stdoutBytes, &output)
	if err != nil {
		return
	}

	err = scripts.SanityCheck(&input, &output)
Lukas Burgey's avatar
Lukas Burgey committed
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
	if err != nil {
		return
	}

	return
}

// reads deployments from sink.Scheduled
func (sink *Sink) depHandler() {
	handleWrapper := func(dep deps.Dep) {
		log.Printf("[Sink] Handling: %v", dep)

		output, err := sink.handleDep(dep)
		if err != nil {
			dep.Log("Error handling dep: %s", err)

			output = scripts.Output{
				State: scripts.Failed,
				// TODO maybe another message for the user
				Msg: fmt.Sprint(err),
			}
		}

		go func() {
220
221
222
223
224
225
226
227
			reply := deps.Reply{
				ID:                   dep.ID,
				State:                output.State,
				Msg:                  output.Msg,
				UserCredentialStates: output.UserCredentialStates,
			}
			if output.Questionnaire != nil && output.State == scripts.Questionnaire {
				reply.Questionnaire = output.Questionnaire
228
				reply.QuestionnaireAnswers = output.QuestionnaireAnswers
229
230
231
			}
			if output.Credentials != nil && output.State == scripts.Deployed {
				reply.Credentials = output.Credentials
Lukas Burgey's avatar
Lukas Burgey committed
232
			}
233
			reply = completeReply(dep, reply)
234
235

			sink.Replies <- reply
Lukas Burgey's avatar
Lukas Burgey committed
236
237
238
239
240
241
242
243
244
245
246
247
248
		}()
	}

	for newDep := range sink.Scheduled {
		if sink.Config.Debug.Sequential {
			handleWrapper(newDep)
		} else {
			// handle deps asynchronously
			go handleWrapper(newDep)
		}
	}
}

249
func (sink *Sink) sendReply(reply deps.Reply) (err error) {
Lukas Burgey's avatar
Lukas Burgey committed
250
251
	var taskResponse []byte
	taskResponse, err = json.MarshalIndent(reply, "", "    ")
Lukas Burgey's avatar
Lukas Burgey committed
252
253
254
255
	if err != nil {
		return
	}

256
	if sink.Config.Debug.Backend {
257
		log.Printf("Dep Response: %s", indent.Indent(taskResponse))
Lukas Burgey's avatar
Lukas Burgey committed
258
259
	}

260
	url := fmt.Sprintf("https://%s/client/dep-state", sink.Config.Hostname)
Lukas Burgey's avatar
Lukas Burgey committed
261
262
	var req *http.Request
	req, err = http.NewRequest("PATCH", url, bytes.NewReader(taskResponse))
Lukas Burgey's avatar
Lukas Burgey committed
263
264
265
266
267
268
269
	if err != nil {
		return
	}
	req.SetBasicAuth(sink.Config.Username, sink.Config.Password)
	req.Header.Set("Content-Type", "application/json")

	// execute request
Lukas Burgey's avatar
Lukas Burgey committed
270
271
	var resp *http.Response
	resp, err = httpClient.Do(req)
Lukas Burgey's avatar
Lukas Burgey committed
272
273
274
275
276
	if err != nil {
		return
	}
	defer resp.Body.Close()

Lukas Burgey's avatar
Lukas Burgey committed
277
	var body []byte
278
279
	body, readErr := ioutil.ReadAll(resp.Body)
	if readErr != nil {
Lukas Burgey's avatar
Lukas Burgey committed
280
281
		log.Printf("sendReply: unable to read response body")
		body = []byte{}
282
283
	}

284
285
286
287
	if resp.StatusCode == 404 {
		reply.Log("sendReply failed: (status: %v, body: %s), will not retry!", resp.StatusCode, body)
		return
	} else if resp.StatusCode != 200 {
288
289
290
291
292
293
		err = fmt.Errorf("sendReply failed: (status: %v, body: %s)", resp.StatusCode, body)
		return
	}

	if sink.Config.Debug.Backend {
		reply.Log("Successful response")
Lukas Burgey's avatar
Lukas Burgey committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
	}
	return
}

func (sink *Sink) retryReply(reply deps.Reply) {
	time.Sleep(time.Minute)
	sink.Replies <- reply
}

// reads replies from sink.Replies
func (sink *Sink) depResponder() {
	for doneDep := range sink.Replies {
		// ack tasks asynchronously
		go func(reply deps.Reply) {
			err := sink.sendReply(reply)
			if err != nil {
				reply.Log("ACK-ERROR: %s", err)

				// retry failed replies
				go sink.retryReply(reply)
				return
			}
		}(doneDep)
	}
}

// Init initializes the sink
func (sink *Sink) Init(conf *config.Config) {
	sink.Config = conf
	sink.Scheduled = make(chan deps.Dep)
	sink.Replies = make(chan deps.Reply)
}

// Connect connects the sink and starts handling incoming deployments
func (sink *Sink) Connect(src <-chan deps.Dep) (err error) {
	// the responder finishes when sink.Replies closes
	go sink.depResponder()

332
	// the handler finishes when sink.Scheduled closes
Lukas Burgey's avatar
Lukas Burgey committed
333
334
335
336
	go sink.depHandler()

	go func() {
		// schedule until the src runs out
Lukas Burgey's avatar
Lukas Burgey committed
337
		var err error
338
339
		for dep := range src {
			err = sink.scheduleDep(dep)
Lukas Burgey's avatar
Lukas Burgey committed
340
			if err != nil {
341
342
343
344
345
346
347
348
349
350
				// we reply to the portal that we couldn't schedule this deployment
				go func() {
					reply := deps.Reply{
						ID:    dep.ID,
						State: scripts.Rejected,
						Msg:   fmt.Sprintf("Error scheduling execution of this deployment: %s", err),
					}
					reply = completeReply(dep, reply) // fill in UserCredentialStates
					sink.Replies <- reply
				}()
Lukas Burgey's avatar
Lukas Burgey committed
351
352
				log.Printf("[Sink] Error scheduling: %v", err)
			}
Lukas Burgey's avatar
Lukas Burgey committed
353
354
		}

Lukas Burgey's avatar
Lukas Burgey committed
355
356
357
		// clean up before exiting
		close(sink.Scheduled)
		close(sink.Replies)
Lukas Burgey's avatar
Lukas Burgey committed
358
359
360
	}()
	return
}