11305: added kernel and module dependency for docker migration in doc
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvados"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "os/exec"
14         "os/signal"
15         "strings"
16         "syscall"
17 )
18
19 type TaskDef struct {
20         Command            []string          `json:"command"`
21         Env                map[string]string `json:"task.env"`
22         Stdin              string            `json:"task.stdin"`
23         Stdout             string            `json:"task.stdout"`
24         Stderr             string            `json:"task.stderr"`
25         Vwd                map[string]string `json:"task.vwd"`
26         SuccessCodes       []int             `json:"task.successCodes"`
27         PermanentFailCodes []int             `json:"task.permanentFailCodes"`
28         TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
29         KeepTmpOutput      bool              `json:"task.keepTmpOutput"`
30 }
31
32 type Tasks struct {
33         Tasks []TaskDef `json:"tasks"`
34 }
35
36 type Job struct {
37         Script_parameters Tasks `json:"script_parameters"`
38 }
39
40 type Task struct {
41         Job_uuid                 string  `json:"job_uuid"`
42         Created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
43         Parameters               TaskDef `json:"parameters"`
44         Sequence                 int     `json:"sequence"`
45         Output                   string  `json:"output"`
46         Success                  bool    `json:"success"`
47         Progress                 float32 `json:"sequence"`
48 }
49
50 type IArvadosClient interface {
51         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
52         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
53 }
54
55 func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
56         tmpdir = crunchtmpdir + "/tmpdir"
57         err = os.Mkdir(tmpdir, 0700)
58         if err != nil {
59                 return "", "", err
60         }
61
62         if keepTmp {
63                 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
64         } else {
65                 outdir = crunchtmpdir + "/outdir"
66                 err = os.Mkdir(outdir, 0700)
67                 if err != nil {
68                         return "", "", err
69                 }
70         }
71
72         return tmpdir, outdir, nil
73 }
74
75 func checkOutputFilename(outdir, fn string) error {
76         if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
77                 return fmt.Errorf("Path must not start or end with '/'")
78         }
79         if strings.Index("../", fn) != -1 {
80                 return fmt.Errorf("Path must not contain '../'")
81         }
82
83         sl := strings.LastIndex(fn, "/")
84         if sl != -1 {
85                 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
86         }
87         return nil
88 }
89
90 func copyFile(dst, src string) error {
91         in, err := os.Open(src)
92         if err != nil {
93                 return err
94         }
95         defer in.Close()
96
97         out, err := os.Create(dst)
98         if err != nil {
99                 return err
100         }
101         defer out.Close()
102
103         _, err = io.Copy(out, in)
104         return err
105 }
106
107 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
108         if taskp.Vwd != nil {
109                 for k, v := range taskp.Vwd {
110                         v = substitute(v, replacements)
111                         err = checkOutputFilename(outdir, k)
112                         if err != nil {
113                                 return "", "", "", err
114                         }
115                         if taskp.KeepTmpOutput {
116                                 err = copyFile(v, outdir+"/"+k)
117                         } else {
118                                 err = os.Symlink(v, outdir+"/"+k)
119                         }
120                         if err != nil {
121                                 return "", "", "", err
122                         }
123                 }
124         }
125
126         if taskp.Stdin != "" {
127                 // Set up stdin redirection
128                 stdin = substitute(taskp.Stdin, replacements)
129                 cmd.Stdin, err = os.Open(stdin)
130                 if err != nil {
131                         return "", "", "", err
132                 }
133         }
134
135         if taskp.Stdout != "" {
136                 err = checkOutputFilename(outdir, taskp.Stdout)
137                 if err != nil {
138                         return "", "", "", err
139                 }
140                 // Set up stdout redirection
141                 stdout = outdir + "/" + taskp.Stdout
142                 cmd.Stdout, err = os.Create(stdout)
143                 if err != nil {
144                         return "", "", "", err
145                 }
146         } else {
147                 cmd.Stdout = os.Stdout
148         }
149
150         if taskp.Stderr != "" {
151                 err = checkOutputFilename(outdir, taskp.Stderr)
152                 if err != nil {
153                         return "", "", "", err
154                 }
155                 // Set up stderr redirection
156                 stderr = outdir + "/" + taskp.Stderr
157                 cmd.Stderr, err = os.Create(stderr)
158                 if err != nil {
159                         return "", "", "", err
160                 }
161         } else {
162                 cmd.Stderr = os.Stderr
163         }
164
165         if taskp.Env != nil {
166                 // Set up subprocess environment
167                 cmd.Env = os.Environ()
168                 for k, v := range taskp.Env {
169                         v = substitute(v, replacements)
170                         cmd.Env = append(cmd.Env, k+"="+v)
171                 }
172         }
173         return stdin, stdout, stderr, nil
174 }
175
176 // Set up signal handlers.  Go sends signal notifications to a "signal
177 // channel".
178 func setupSignals(cmd *exec.Cmd) chan os.Signal {
179         sigChan := make(chan os.Signal, 1)
180         signal.Notify(sigChan, syscall.SIGTERM)
181         signal.Notify(sigChan, syscall.SIGINT)
182         signal.Notify(sigChan, syscall.SIGQUIT)
183         return sigChan
184 }
185
186 func inCodes(code int, codes []int) bool {
187         if codes != nil {
188                 for _, c := range codes {
189                         if code == c {
190                                 return true
191                         }
192                 }
193         }
194         return false
195 }
196
197 const TASK_TEMPFAIL = 111
198
199 type TempFail struct{ error }
200 type PermFail struct{}
201
202 func (s PermFail) Error() string {
203         return "PermFail"
204 }
205
206 func substitute(inp string, subst map[string]string) string {
207         for k, v := range subst {
208                 inp = strings.Replace(inp, k, v, -1)
209         }
210         return inp
211 }
212
213 func getKeepTmp(outdir string) (manifest string, err error) {
214         fn, err := os.Open(outdir + "/" + ".arvados#collection")
215         if err != nil {
216                 return "", err
217         }
218         defer fn.Close()
219
220         buf, err := ioutil.ReadAll(fn)
221         if err != nil {
222                 return "", err
223         }
224         collection := arvados.Collection{}
225         err = json.Unmarshal(buf, &collection)
226         return collection.ManifestText, err
227 }
228
229 func runner(api IArvadosClient,
230         kc IKeepClient,
231         jobUuid, taskUuid, crunchtmpdir, keepmount string,
232         jobStruct Job, taskStruct Task) error {
233
234         var err error
235         taskp := taskStruct.Parameters
236
237         // If this is task 0 and there are multiple tasks, dispatch subtasks
238         // and exit.
239         if taskStruct.Sequence == 0 {
240                 if len(jobStruct.Script_parameters.Tasks) == 1 {
241                         taskp = jobStruct.Script_parameters.Tasks[0]
242                 } else {
243                         for _, task := range jobStruct.Script_parameters.Tasks {
244                                 err := api.Create("job_tasks",
245                                         map[string]interface{}{
246                                                 "job_task": Task{Job_uuid: jobUuid,
247                                                         Created_by_job_task_uuid: taskUuid,
248                                                         Sequence:                 1,
249                                                         Parameters:               task}},
250                                         nil)
251                                 if err != nil {
252                                         return TempFail{err}
253                                 }
254                         }
255                         err = api.Update("job_tasks", taskUuid,
256                                 map[string]interface{}{
257                                         "job_task": Task{
258                                                 Output:   "",
259                                                 Success:  true,
260                                                 Progress: 1.0}},
261                                 nil)
262                         return nil
263                 }
264         }
265
266         var tmpdir, outdir string
267         tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
268         if err != nil {
269                 return TempFail{err}
270         }
271
272         replacements := map[string]string{
273                 "$(task.tmpdir)": tmpdir,
274                 "$(task.outdir)": outdir,
275                 "$(task.keep)":   keepmount}
276
277         log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
278         log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
279         log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
280
281         // Set up subprocess
282         for k, v := range taskp.Command {
283                 taskp.Command[k] = substitute(v, replacements)
284         }
285
286         cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
287
288         cmd.Dir = outdir
289
290         var stdin, stdout, stderr string
291         stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
292         if err != nil {
293                 return err
294         }
295
296         // Run subprocess and wait for it to complete
297         if stdin != "" {
298                 stdin = " < " + stdin
299         }
300         if stdout != "" {
301                 stdout = " > " + stdout
302         }
303         if stderr != "" {
304                 stderr = " 2> " + stderr
305         }
306         log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
307
308         var caughtSignal os.Signal
309         sigChan := setupSignals(cmd)
310
311         err = cmd.Start()
312         if err != nil {
313                 signal.Stop(sigChan)
314                 return TempFail{err}
315         }
316
317         finishedSignalNotify := make(chan struct{})
318         go func(sig <-chan os.Signal) {
319                 for sig := range sig {
320                         caughtSignal = sig
321                         cmd.Process.Signal(caughtSignal)
322                 }
323                 close(finishedSignalNotify)
324         }(sigChan)
325
326         err = cmd.Wait()
327         signal.Stop(sigChan)
328
329         close(sigChan)
330         <-finishedSignalNotify
331
332         if caughtSignal != nil {
333                 log.Printf("Caught signal %v", caughtSignal)
334                 return PermFail{}
335         }
336
337         if err != nil {
338                 // Run() returns ExitError on non-zero exit code, but we handle
339                 // that down below.  So only return if it's not ExitError.
340                 if _, ok := err.(*exec.ExitError); !ok {
341                         return TempFail{err}
342                 }
343         }
344
345         var success bool
346
347         exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
348
349         log.Printf("Completed with exit code %v", exitCode)
350
351         if inCodes(exitCode, taskp.PermanentFailCodes) {
352                 success = false
353         } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
354                 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
355         } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
356                 success = true
357         } else {
358                 success = false
359         }
360
361         // Upload output directory
362         var manifest string
363         if taskp.KeepTmpOutput {
364                 manifest, err = getKeepTmp(outdir)
365         } else {
366                 manifest, err = WriteTree(kc, outdir)
367         }
368         if err != nil {
369                 return TempFail{err}
370         }
371
372         // Set status
373         err = api.Update("job_tasks", taskUuid,
374                 map[string]interface{}{
375                         "job_task": Task{
376                                 Output:   manifest,
377                                 Success:  success,
378                                 Progress: 1}},
379                 nil)
380         if err != nil {
381                 return TempFail{err}
382         }
383
384         if success {
385                 return nil
386         } else {
387                 return PermFail{}
388         }
389 }
390
391 func main() {
392         api, err := arvadosclient.MakeArvadosClient()
393         if err != nil {
394                 log.Fatal(err)
395         }
396
397         jobUuid := os.Getenv("JOB_UUID")
398         taskUuid := os.Getenv("TASK_UUID")
399         tmpdir := os.Getenv("TASK_WORK")
400         keepmount := os.Getenv("TASK_KEEPMOUNT")
401
402         var jobStruct Job
403         var taskStruct Task
404
405         err = api.Get("jobs", jobUuid, nil, &jobStruct)
406         if err != nil {
407                 log.Fatal(err)
408         }
409         err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
410         if err != nil {
411                 log.Fatal(err)
412         }
413
414         var kc IKeepClient
415         kc, err = keepclient.MakeKeepClient(api)
416         if err != nil {
417                 log.Fatal(err)
418         }
419
420         syscall.Umask(0022)
421         err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
422
423         if err == nil {
424                 os.Exit(0)
425         } else if _, ok := err.(TempFail); ok {
426                 log.Print(err)
427                 os.Exit(TASK_TEMPFAIL)
428         } else if _, ok := err.(PermFail); ok {
429                 os.Exit(1)
430         } else {
431                 log.Fatal(err)
432         }
433 }