Merge branch 'master' into 11454-wb-federated-search
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "os"
14         "os/exec"
15         "os/signal"
16         "strings"
17         "syscall"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 type TaskDef struct {
25         Command            []string          `json:"command"`
26         Env                map[string]string `json:"task.env"`
27         Stdin              string            `json:"task.stdin"`
28         Stdout             string            `json:"task.stdout"`
29         Stderr             string            `json:"task.stderr"`
30         Vwd                map[string]string `json:"task.vwd"`
31         SuccessCodes       []int             `json:"task.successCodes"`
32         PermanentFailCodes []int             `json:"task.permanentFailCodes"`
33         TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
34         KeepTmpOutput      bool              `json:"task.keepTmpOutput"`
35 }
36
37 type Tasks struct {
38         Tasks []TaskDef `json:"tasks"`
39 }
40
41 type Job struct {
42         ScriptParameters Tasks `json:"script_parameters"`
43 }
44
45 type Task struct {
46         JobUUID              string  `json:"job_uuid"`
47         CreatedByJobTaskUUID string  `json:"created_by_job_task_uuid"`
48         Parameters           TaskDef `json:"parameters"`
49         Sequence             int     `json:"sequence"`
50         Output               string  `json:"output"`
51         Success              bool    `json:"success"`
52         Progress             float32 `json:"sequence"`
53 }
54
55 type IArvadosClient interface {
56         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
57         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
58 }
59
60 func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
61         tmpdir = crunchtmpdir + "/tmpdir"
62         err = os.Mkdir(tmpdir, 0700)
63         if err != nil {
64                 return "", "", err
65         }
66
67         if keepTmp {
68                 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
69         } else {
70                 outdir = crunchtmpdir + "/outdir"
71                 err = os.Mkdir(outdir, 0700)
72                 if err != nil {
73                         return "", "", err
74                 }
75         }
76
77         return tmpdir, outdir, nil
78 }
79
80 func checkOutputFilename(outdir, fn string) error {
81         if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
82                 return fmt.Errorf("Path must not start or end with '/'")
83         }
84         if strings.Index("../", fn) != -1 {
85                 return fmt.Errorf("Path must not contain '../'")
86         }
87
88         sl := strings.LastIndex(fn, "/")
89         if sl != -1 {
90                 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
91         }
92         return nil
93 }
94
95 func copyFile(dst, src string) error {
96         in, err := os.Open(src)
97         if err != nil {
98                 return err
99         }
100         defer in.Close()
101
102         out, err := os.Create(dst)
103         if err != nil {
104                 return err
105         }
106         defer out.Close()
107
108         _, err = io.Copy(out, in)
109         return err
110 }
111
112 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
113         if taskp.Vwd != nil {
114                 for k, v := range taskp.Vwd {
115                         v = substitute(v, replacements)
116                         err = checkOutputFilename(outdir, k)
117                         if err != nil {
118                                 return "", "", "", err
119                         }
120                         if taskp.KeepTmpOutput {
121                                 err = copyFile(v, outdir+"/"+k)
122                         } else {
123                                 err = os.Symlink(v, outdir+"/"+k)
124                         }
125                         if err != nil {
126                                 return "", "", "", err
127                         }
128                 }
129         }
130
131         if taskp.Stdin != "" {
132                 // Set up stdin redirection
133                 stdin = substitute(taskp.Stdin, replacements)
134                 cmd.Stdin, err = os.Open(stdin)
135                 if err != nil {
136                         return "", "", "", err
137                 }
138         }
139
140         if taskp.Stdout != "" {
141                 err = checkOutputFilename(outdir, taskp.Stdout)
142                 if err != nil {
143                         return "", "", "", err
144                 }
145                 // Set up stdout redirection
146                 stdout = outdir + "/" + taskp.Stdout
147                 cmd.Stdout, err = os.Create(stdout)
148                 if err != nil {
149                         return "", "", "", err
150                 }
151         } else {
152                 cmd.Stdout = os.Stdout
153         }
154
155         if taskp.Stderr != "" {
156                 err = checkOutputFilename(outdir, taskp.Stderr)
157                 if err != nil {
158                         return "", "", "", err
159                 }
160                 // Set up stderr redirection
161                 stderr = outdir + "/" + taskp.Stderr
162                 cmd.Stderr, err = os.Create(stderr)
163                 if err != nil {
164                         return "", "", "", err
165                 }
166         } else {
167                 cmd.Stderr = os.Stderr
168         }
169
170         if taskp.Env != nil {
171                 // Set up subprocess environment
172                 cmd.Env = os.Environ()
173                 for k, v := range taskp.Env {
174                         v = substitute(v, replacements)
175                         cmd.Env = append(cmd.Env, k+"="+v)
176                 }
177         }
178         return stdin, stdout, stderr, nil
179 }
180
181 // Set up signal handlers.  Go sends signal notifications to a "signal
182 // channel".
183 func setupSignals(cmd *exec.Cmd) chan os.Signal {
184         sigChan := make(chan os.Signal, 1)
185         signal.Notify(sigChan, syscall.SIGTERM)
186         signal.Notify(sigChan, syscall.SIGINT)
187         signal.Notify(sigChan, syscall.SIGQUIT)
188         return sigChan
189 }
190
191 func inCodes(code int, codes []int) bool {
192         if codes != nil {
193                 for _, c := range codes {
194                         if code == c {
195                                 return true
196                         }
197                 }
198         }
199         return false
200 }
201
202 const TASK_TEMPFAIL = 111
203
204 type TempFail struct{ error }
205 type PermFail struct{}
206
207 func (s PermFail) Error() string {
208         return "PermFail"
209 }
210
211 func substitute(inp string, subst map[string]string) string {
212         for k, v := range subst {
213                 inp = strings.Replace(inp, k, v, -1)
214         }
215         return inp
216 }
217
218 func getKeepTmp(outdir string) (manifest string, err error) {
219         fn, err := os.Open(outdir + "/" + ".arvados#collection")
220         if err != nil {
221                 return "", err
222         }
223         defer fn.Close()
224
225         buf, err := ioutil.ReadAll(fn)
226         if err != nil {
227                 return "", err
228         }
229         collection := arvados.Collection{}
230         err = json.Unmarshal(buf, &collection)
231         return collection.ManifestText, err
232 }
233
234 func runner(api IArvadosClient,
235         kc IKeepClient,
236         jobUUID, taskUUID, crunchtmpdir, keepmount string,
237         jobStruct Job, taskStruct Task) error {
238
239         var err error
240         taskp := taskStruct.Parameters
241
242         // If this is task 0 and there are multiple tasks, dispatch subtasks
243         // and exit.
244         if taskStruct.Sequence == 0 {
245                 if len(jobStruct.ScriptParameters.Tasks) == 1 {
246                         taskp = jobStruct.ScriptParameters.Tasks[0]
247                 } else {
248                         for _, task := range jobStruct.ScriptParameters.Tasks {
249                                 err := api.Create("job_tasks",
250                                         map[string]interface{}{
251                                                 "job_task": Task{
252                                                         JobUUID:              jobUUID,
253                                                         CreatedByJobTaskUUID: taskUUID,
254                                                         Sequence:             1,
255                                                         Parameters:           task}},
256                                         nil)
257                                 if err != nil {
258                                         return TempFail{err}
259                                 }
260                         }
261                         err = api.Update("job_tasks", taskUUID,
262                                 map[string]interface{}{
263                                         "job_task": map[string]interface{}{
264                                                 "output":   "",
265                                                 "success":  true,
266                                                 "progress": 1.0}},
267                                 nil)
268                         return nil
269                 }
270         }
271
272         var tmpdir, outdir string
273         tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
274         if err != nil {
275                 return TempFail{err}
276         }
277
278         replacements := map[string]string{
279                 "$(task.tmpdir)": tmpdir,
280                 "$(task.outdir)": outdir,
281                 "$(task.keep)":   keepmount}
282
283         log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
284         log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
285         log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
286
287         // Set up subprocess
288         for k, v := range taskp.Command {
289                 taskp.Command[k] = substitute(v, replacements)
290         }
291
292         cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
293
294         cmd.Dir = outdir
295
296         var stdin, stdout, stderr string
297         stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
298         if err != nil {
299                 return err
300         }
301
302         // Run subprocess and wait for it to complete
303         if stdin != "" {
304                 stdin = " < " + stdin
305         }
306         if stdout != "" {
307                 stdout = " > " + stdout
308         }
309         if stderr != "" {
310                 stderr = " 2> " + stderr
311         }
312         log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
313
314         var caughtSignal os.Signal
315         sigChan := setupSignals(cmd)
316
317         err = cmd.Start()
318         if err != nil {
319                 signal.Stop(sigChan)
320                 return TempFail{err}
321         }
322
323         finishedSignalNotify := make(chan struct{})
324         go func(sig <-chan os.Signal) {
325                 for sig := range sig {
326                         caughtSignal = sig
327                         cmd.Process.Signal(caughtSignal)
328                 }
329                 close(finishedSignalNotify)
330         }(sigChan)
331
332         err = cmd.Wait()
333         signal.Stop(sigChan)
334
335         close(sigChan)
336         <-finishedSignalNotify
337
338         if caughtSignal != nil {
339                 log.Printf("Caught signal %v", caughtSignal)
340                 return PermFail{}
341         }
342
343         if err != nil {
344                 // Run() returns ExitError on non-zero exit code, but we handle
345                 // that down below.  So only return if it's not ExitError.
346                 if _, ok := err.(*exec.ExitError); !ok {
347                         return TempFail{err}
348                 }
349         }
350
351         var success bool
352
353         exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
354
355         log.Printf("Completed with exit code %v", exitCode)
356
357         if inCodes(exitCode, taskp.PermanentFailCodes) {
358                 success = false
359         } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
360                 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
361         } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
362                 success = true
363         } else {
364                 success = false
365         }
366
367         // Upload output directory
368         var manifest string
369         if taskp.KeepTmpOutput {
370                 manifest, err = getKeepTmp(outdir)
371         } else {
372                 manifest, err = WriteTree(kc, outdir)
373         }
374         if err != nil {
375                 return TempFail{err}
376         }
377
378         // Set status
379         err = api.Update("job_tasks", taskUUID,
380                 map[string]interface{}{
381                         "job_task": Task{
382                                 Output:   manifest,
383                                 Success:  success,
384                                 Progress: 1}},
385                 nil)
386         if err != nil {
387                 return TempFail{err}
388         }
389
390         if success {
391                 return nil
392         } else {
393                 return PermFail{}
394         }
395 }
396
397 func main() {
398         api, err := arvadosclient.MakeArvadosClient()
399         if err != nil {
400                 log.Fatal(err)
401         }
402
403         jobUUID := os.Getenv("JOB_UUID")
404         taskUUID := os.Getenv("TASK_UUID")
405         tmpdir := os.Getenv("TASK_WORK")
406         keepmount := os.Getenv("TASK_KEEPMOUNT")
407
408         var jobStruct Job
409         var taskStruct Task
410
411         err = api.Get("jobs", jobUUID, nil, &jobStruct)
412         if err != nil {
413                 log.Fatal(err)
414         }
415         err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
416         if err != nil {
417                 log.Fatal(err)
418         }
419
420         var kc IKeepClient
421         kc, err = keepclient.MakeKeepClient(api)
422         if err != nil {
423                 log.Fatal(err)
424         }
425
426         syscall.Umask(0022)
427         err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
428
429         if err == nil {
430                 os.Exit(0)
431         } else if _, ok := err.(TempFail); ok {
432                 log.Print(err)
433                 os.Exit(TASK_TEMPFAIL)
434         } else if _, ok := err.(PermFail); ok {
435                 os.Exit(1)
436         } else {
437                 log.Fatal(err)
438         }
439 }