11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "os"
10         "os/exec"
11         "os/signal"
12         "strings"
13         "syscall"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 )
19
20 type TaskDef struct {
21         Command            []string          `json:"command"`
22         Env                map[string]string `json:"task.env"`
23         Stdin              string            `json:"task.stdin"`
24         Stdout             string            `json:"task.stdout"`
25         Stderr             string            `json:"task.stderr"`
26         Vwd                map[string]string `json:"task.vwd"`
27         SuccessCodes       []int             `json:"task.successCodes"`
28         PermanentFailCodes []int             `json:"task.permanentFailCodes"`
29         TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
30         KeepTmpOutput      bool              `json:"task.keepTmpOutput"`
31 }
32
33 type Tasks struct {
34         Tasks []TaskDef `json:"tasks"`
35 }
36
37 type Job struct {
38         ScriptParameters Tasks `json:"script_parameters"`
39 }
40
41 type Task struct {
42         JobUUID              string  `json:"job_uuid"`
43         CreatedByJobTaskUUID string  `json:"created_by_job_task_uuid"`
44         Parameters           TaskDef `json:"parameters"`
45         Sequence             int     `json:"sequence"`
46         Output               string  `json:"output"`
47         Success              bool    `json:"success"`
48         Progress             float32 `json:"sequence"`
49 }
50
51 type IArvadosClient interface {
52         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
53         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
54 }
55
56 func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
57         tmpdir = crunchtmpdir + "/tmpdir"
58         err = os.Mkdir(tmpdir, 0700)
59         if err != nil {
60                 return "", "", err
61         }
62
63         if keepTmp {
64                 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
65         } else {
66                 outdir = crunchtmpdir + "/outdir"
67                 err = os.Mkdir(outdir, 0700)
68                 if err != nil {
69                         return "", "", err
70                 }
71         }
72
73         return tmpdir, outdir, nil
74 }
75
76 func checkOutputFilename(outdir, fn string) error {
77         if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
78                 return fmt.Errorf("Path must not start or end with '/'")
79         }
80         if strings.Index("../", fn) != -1 {
81                 return fmt.Errorf("Path must not contain '../'")
82         }
83
84         sl := strings.LastIndex(fn, "/")
85         if sl != -1 {
86                 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
87         }
88         return nil
89 }
90
91 func copyFile(dst, src string) error {
92         in, err := os.Open(src)
93         if err != nil {
94                 return err
95         }
96         defer in.Close()
97
98         out, err := os.Create(dst)
99         if err != nil {
100                 return err
101         }
102         defer out.Close()
103
104         _, err = io.Copy(out, in)
105         return err
106 }
107
108 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
109         if taskp.Vwd != nil {
110                 for k, v := range taskp.Vwd {
111                         v = substitute(v, replacements)
112                         err = checkOutputFilename(outdir, k)
113                         if err != nil {
114                                 return "", "", "", err
115                         }
116                         if taskp.KeepTmpOutput {
117                                 err = copyFile(v, outdir+"/"+k)
118                         } else {
119                                 err = os.Symlink(v, outdir+"/"+k)
120                         }
121                         if err != nil {
122                                 return "", "", "", err
123                         }
124                 }
125         }
126
127         if taskp.Stdin != "" {
128                 // Set up stdin redirection
129                 stdin = substitute(taskp.Stdin, replacements)
130                 cmd.Stdin, err = os.Open(stdin)
131                 if err != nil {
132                         return "", "", "", err
133                 }
134         }
135
136         if taskp.Stdout != "" {
137                 err = checkOutputFilename(outdir, taskp.Stdout)
138                 if err != nil {
139                         return "", "", "", err
140                 }
141                 // Set up stdout redirection
142                 stdout = outdir + "/" + taskp.Stdout
143                 cmd.Stdout, err = os.Create(stdout)
144                 if err != nil {
145                         return "", "", "", err
146                 }
147         } else {
148                 cmd.Stdout = os.Stdout
149         }
150
151         if taskp.Stderr != "" {
152                 err = checkOutputFilename(outdir, taskp.Stderr)
153                 if err != nil {
154                         return "", "", "", err
155                 }
156                 // Set up stderr redirection
157                 stderr = outdir + "/" + taskp.Stderr
158                 cmd.Stderr, err = os.Create(stderr)
159                 if err != nil {
160                         return "", "", "", err
161                 }
162         } else {
163                 cmd.Stderr = os.Stderr
164         }
165
166         if taskp.Env != nil {
167                 // Set up subprocess environment
168                 cmd.Env = os.Environ()
169                 for k, v := range taskp.Env {
170                         v = substitute(v, replacements)
171                         cmd.Env = append(cmd.Env, k+"="+v)
172                 }
173         }
174         return stdin, stdout, stderr, nil
175 }
176
177 // Set up signal handlers.  Go sends signal notifications to a "signal
178 // channel".
179 func setupSignals(cmd *exec.Cmd) chan os.Signal {
180         sigChan := make(chan os.Signal, 1)
181         signal.Notify(sigChan, syscall.SIGTERM)
182         signal.Notify(sigChan, syscall.SIGINT)
183         signal.Notify(sigChan, syscall.SIGQUIT)
184         return sigChan
185 }
186
187 func inCodes(code int, codes []int) bool {
188         if codes != nil {
189                 for _, c := range codes {
190                         if code == c {
191                                 return true
192                         }
193                 }
194         }
195         return false
196 }
197
198 const TASK_TEMPFAIL = 111
199
200 type TempFail struct{ error }
201 type PermFail struct{}
202
203 func (s PermFail) Error() string {
204         return "PermFail"
205 }
206
207 func substitute(inp string, subst map[string]string) string {
208         for k, v := range subst {
209                 inp = strings.Replace(inp, k, v, -1)
210         }
211         return inp
212 }
213
214 func getKeepTmp(outdir string) (manifest string, err error) {
215         fn, err := os.Open(outdir + "/" + ".arvados#collection")
216         if err != nil {
217                 return "", err
218         }
219         defer fn.Close()
220
221         buf, err := ioutil.ReadAll(fn)
222         if err != nil {
223                 return "", err
224         }
225         collection := arvados.Collection{}
226         err = json.Unmarshal(buf, &collection)
227         return collection.ManifestText, err
228 }
229
230 func runner(api IArvadosClient,
231         kc IKeepClient,
232         jobUUID, taskUUID, crunchtmpdir, keepmount string,
233         jobStruct Job, taskStruct Task) error {
234
235         var err error
236         taskp := taskStruct.Parameters
237
238         // If this is task 0 and there are multiple tasks, dispatch subtasks
239         // and exit.
240         if taskStruct.Sequence == 0 {
241                 if len(jobStruct.ScriptParameters.Tasks) == 1 {
242                         taskp = jobStruct.ScriptParameters.Tasks[0]
243                 } else {
244                         for _, task := range jobStruct.ScriptParameters.Tasks {
245                                 err := api.Create("job_tasks",
246                                         map[string]interface{}{
247                                                 "job_task": Task{
248                                                         JobUUID:              jobUUID,
249                                                         CreatedByJobTaskUUID: taskUUID,
250                                                         Sequence:             1,
251                                                         Parameters:           task}},
252                                         nil)
253                                 if err != nil {
254                                         return TempFail{err}
255                                 }
256                         }
257                         err = api.Update("job_tasks", taskUUID,
258                                 map[string]interface{}{
259                                         "job_task": map[string]interface{}{
260                                                 "output":   "",
261                                                 "success":  true,
262                                                 "progress": 1.0}},
263                                 nil)
264                         return nil
265                 }
266         }
267
268         var tmpdir, outdir string
269         tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
270         if err != nil {
271                 return TempFail{err}
272         }
273
274         replacements := map[string]string{
275                 "$(task.tmpdir)": tmpdir,
276                 "$(task.outdir)": outdir,
277                 "$(task.keep)":   keepmount}
278
279         log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
280         log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
281         log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
282
283         // Set up subprocess
284         for k, v := range taskp.Command {
285                 taskp.Command[k] = substitute(v, replacements)
286         }
287
288         cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
289
290         cmd.Dir = outdir
291
292         var stdin, stdout, stderr string
293         stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
294         if err != nil {
295                 return err
296         }
297
298         // Run subprocess and wait for it to complete
299         if stdin != "" {
300                 stdin = " < " + stdin
301         }
302         if stdout != "" {
303                 stdout = " > " + stdout
304         }
305         if stderr != "" {
306                 stderr = " 2> " + stderr
307         }
308         log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
309
310         var caughtSignal os.Signal
311         sigChan := setupSignals(cmd)
312
313         err = cmd.Start()
314         if err != nil {
315                 signal.Stop(sigChan)
316                 return TempFail{err}
317         }
318
319         finishedSignalNotify := make(chan struct{})
320         go func(sig <-chan os.Signal) {
321                 for sig := range sig {
322                         caughtSignal = sig
323                         cmd.Process.Signal(caughtSignal)
324                 }
325                 close(finishedSignalNotify)
326         }(sigChan)
327
328         err = cmd.Wait()
329         signal.Stop(sigChan)
330
331         close(sigChan)
332         <-finishedSignalNotify
333
334         if caughtSignal != nil {
335                 log.Printf("Caught signal %v", caughtSignal)
336                 return PermFail{}
337         }
338
339         if err != nil {
340                 // Run() returns ExitError on non-zero exit code, but we handle
341                 // that down below.  So only return if it's not ExitError.
342                 if _, ok := err.(*exec.ExitError); !ok {
343                         return TempFail{err}
344                 }
345         }
346
347         var success bool
348
349         exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
350
351         log.Printf("Completed with exit code %v", exitCode)
352
353         if inCodes(exitCode, taskp.PermanentFailCodes) {
354                 success = false
355         } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
356                 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
357         } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
358                 success = true
359         } else {
360                 success = false
361         }
362
363         // Upload output directory
364         var manifest string
365         if taskp.KeepTmpOutput {
366                 manifest, err = getKeepTmp(outdir)
367         } else {
368                 manifest, err = WriteTree(kc, outdir)
369         }
370         if err != nil {
371                 return TempFail{err}
372         }
373
374         // Set status
375         err = api.Update("job_tasks", taskUUID,
376                 map[string]interface{}{
377                         "job_task": Task{
378                                 Output:   manifest,
379                                 Success:  success,
380                                 Progress: 1}},
381                 nil)
382         if err != nil {
383                 return TempFail{err}
384         }
385
386         if success {
387                 return nil
388         } else {
389                 return PermFail{}
390         }
391 }
392
393 func main() {
394         api, err := arvadosclient.MakeArvadosClient()
395         if err != nil {
396                 log.Fatal(err)
397         }
398
399         jobUUID := os.Getenv("JOB_UUID")
400         taskUUID := os.Getenv("TASK_UUID")
401         tmpdir := os.Getenv("TASK_WORK")
402         keepmount := os.Getenv("TASK_KEEPMOUNT")
403
404         var jobStruct Job
405         var taskStruct Task
406
407         err = api.Get("jobs", jobUUID, nil, &jobStruct)
408         if err != nil {
409                 log.Fatal(err)
410         }
411         err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
412         if err != nil {
413                 log.Fatal(err)
414         }
415
416         var kc IKeepClient
417         kc, err = keepclient.MakeKeepClient(api)
418         if err != nil {
419                 log.Fatal(err)
420         }
421
422         syscall.Umask(0022)
423         err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
424
425         if err == nil {
426                 os.Exit(0)
427         } else if _, ok := err.(TempFail); ok {
428                 log.Print(err)
429                 os.Exit(TASK_TEMPFAIL)
430         } else if _, ok := err.(PermFail); ok {
431                 os.Exit(1)
432         } else {
433                 log.Fatal(err)
434         }
435 }