7582: Working on tests.
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
1 package main
2
3 import (
4         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5         //"git.curoverse.com/arvados.git/sdk/go/keepclient"
6         "log"
7         "os"
8         "os/exec"
9         "os/signal"
10         "syscall"
11 )
12
13 type TaskDef struct {
14         commands           []string          `json:"commands"`
15         env                map[string]string `json:"task.env"`
16         stdin              string            `json:"task.stdin"`
17         stdout             string            `json:"task.stdout"`
18         vwd                map[string]string `json:"task.vwd"`
19         successCodes       []int             `json:"task.successCodes"`
20         permanentFailCodes []int             `json:"task.permanentFailCodes"`
21         temporaryFailCodes []int             `json:"task.temporaryFailCodes"`
22 }
23
24 type Tasks struct {
25         tasks []TaskDef `json:"script_parameters"`
26 }
27
28 type Job struct {
29         script_parameters Tasks `json:"script_parameters"`
30 }
31
32 type Task struct {
33         job_uuid                 string  `json:"job_uuid"`
34         created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
35         parameters               TaskDef `json:"parameters"`
36         sequence                 int     `json:"sequence"`
37         output                   string  `json:"output"`
38         success                  bool    `json:"success"`
39         progress                 float32 `json:"sequence"`
40 }
41
42 func setupDirectories(tmpdir string) (outdir string, err error) {
43         err = os.Chdir(tmpdir)
44         if err != nil {
45                 return "", err
46         }
47
48         err = os.Mkdir("tmpdir", 0700)
49         if err != nil {
50                 return "", err
51         }
52
53         err = os.Mkdir("outdir", 0700)
54         if err != nil {
55                 return "", err
56         }
57
58         os.Chdir("outdir")
59         if err != nil {
60                 return "", err
61         }
62
63         outdir, err = os.Getwd()
64         if err != nil {
65                 return "", err
66         }
67
68         return outdir, nil
69 }
70
71 func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error {
72         var err error
73
74         //if taskp.vwd != nil {
75         // Set up VWD symlinks in outdir
76         // TODO
77         //}
78
79         if taskp.stdin != "" {
80                 // Set up stdin redirection
81                 cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin)
82                 if err != nil {
83                         log.Fatal(err)
84                 }
85         }
86
87         if taskp.stdout != "" {
88                 // Set up stdout redirection
89                 cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout)
90                 if err != nil {
91                         log.Fatal(err)
92                 }
93         } else {
94                 cmd.Stdout = os.Stdout
95         }
96
97         if taskp.env != nil {
98                 // Set up subprocess environment
99                 cmd.Env = os.Environ()
100                 for k, v := range taskp.env {
101                         cmd.Env = append(cmd.Env, k+"="+v)
102                 }
103         }
104         return nil
105 }
106
107 func setupSignals(cmd *exec.Cmd) {
108         // Set up signal handlers
109         // Forward SIGINT, SIGTERM and SIGQUIT to inner process
110         sigChan := make(chan os.Signal, 1)
111         go func(sig <-chan os.Signal) {
112                 catch := <-sig
113                 if cmd.Process != nil {
114                         cmd.Process.Signal(catch)
115                 }
116         }(sigChan)
117         signal.Notify(sigChan, syscall.SIGTERM)
118         signal.Notify(sigChan, syscall.SIGINT)
119         signal.Notify(sigChan, syscall.SIGQUIT)
120 }
121
122 func inCodes(code int, codes []int) bool {
123         if codes != nil {
124                 for _, c := range codes {
125                         if code == c {
126                                 return true
127                         }
128                 }
129         }
130         return false
131 }
132
133 const TASK_TEMPFAIL = 111
134
135 type TempFail struct{ InnerError error }
136 type PermFail struct{}
137
138 func (s TempFail) Error() string {
139         return s.InnerError.Error()
140 }
141
142 func (s PermFail) Error() string {
143         return "PermFail"
144 }
145
146 func runner(api arvadosclient.IArvadosClient,
147         jobUuid, taskUuid, tmpdir, keepmount string,
148         jobStruct Job, taskStruct Task) error {
149
150         var err error
151         taskp := taskStruct.parameters
152
153         // If this is task 0 and there are multiple tasks, dispatch subtasks
154         // and exit.
155         if taskStruct.sequence == 0 {
156                 if len(jobStruct.script_parameters.tasks) == 1 {
157                         taskp = jobStruct.script_parameters.tasks[0]
158                 } else {
159                         for _, task := range jobStruct.script_parameters.tasks {
160                                 err := api.Create("job_tasks",
161                                         map[string]interface{}{
162                                                 "job_task": Task{job_uuid: jobUuid,
163                                                         created_by_job_task_uuid: taskUuid,
164                                                         sequence:                 1,
165                                                         parameters:               task}},
166                                         nil)
167                                 if err != nil {
168                                         return TempFail{err}
169                                 }
170                         }
171                         err = api.Update("job_tasks", taskUuid,
172                                 map[string]interface{}{
173                                         "job_task": Task{
174                                                 output:   "",
175                                                 success:  true,
176                                                 progress: 1.0}},
177                                 nil)
178                         return nil
179                 }
180         }
181
182         // Set up subprocess
183         cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...)
184
185         var outdir string
186         outdir, err = setupDirectories(tmpdir)
187         if err != nil {
188                 return TempFail{err}
189         }
190
191         cmd.Dir = outdir
192
193         err = setupCommand(cmd, taskp, keepmount, outdir)
194         if err != nil {
195                 return err
196         }
197
198         setupSignals(cmd)
199
200         // Run subprocess and wait for it to complete
201         log.Printf("Running %v", cmd.Args)
202
203         err = cmd.Run()
204
205         if err != nil {
206                 return TempFail{err}
207         }
208
209         const success = 1
210         const permfail = 2
211         const tempfail = 2
212         var status int
213
214         exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
215
216         if inCodes(exitCode, taskp.successCodes) {
217                 status = success
218         } else if inCodes(exitCode, taskp.permanentFailCodes) {
219                 status = permfail
220         } else if inCodes(exitCode, taskp.temporaryFailCodes) {
221                 os.Exit(TASK_TEMPFAIL)
222         } else if cmd.ProcessState.Success() {
223                 status = success
224         } else {
225                 status = permfail
226         }
227
228         // Upload output directory
229         // TODO
230
231         // Set status
232         err = api.Update("job_tasks", taskUuid,
233                 map[string]interface{}{
234                         "job_task": map[string]interface{}{
235                                 "output":   "",
236                                 "success":  status == success,
237                                 "progress": 1.0}},
238                 nil)
239         if err != nil {
240                 return TempFail{err}
241         }
242
243         if status == success {
244                 return nil
245         } else {
246                 return PermFail{}
247         }
248 }
249
250 func main() {
251         syscall.Umask(0077)
252
253         api, err := arvadosclient.MakeArvadosClient()
254         if err != nil {
255                 log.Fatal(err)
256         }
257
258         jobUuid := os.Getenv("JOB_UUID")
259         taskUuid := os.Getenv("TASK_UUID")
260         tmpdir := os.Getenv("TASK_WORK")
261         keepmount := os.Getenv("TASK_KEEPMOUNT")
262
263         var jobStruct Job
264         var taskStruct Task
265
266         err = api.Get("jobs", jobUuid, nil, &jobStruct)
267         if err != nil {
268                 log.Fatal(err)
269         }
270         err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
271         if err != nil {
272                 log.Fatal(err)
273         }
274
275         err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
276
277         if err == nil {
278                 os.Exit(0)
279         } else if _, ok := err.(TempFail); ok {
280                 log.Print(err)
281                 os.Exit(TASK_TEMPFAIL)
282         } else if _, ok := err.(PermFail); ok {
283                 os.Exit(1)
284         } else {
285                 log.Fatal(err)
286         }
287 }