7582: Crunchrunner work in progress.
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
1 package main
2
3 import (
4         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5         //"git.curoverse.com/arvados.git/sdk/go/keepclient"
6         "errors"
7         "log"
8         "os"
9         "os/exec"
10         "os/signal"
11         "syscall"
12 )
13
14 func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
15         r = make(arvadosclient.Dict)
16         err := api.Get(rsc, uuid, nil, &r)
17         if err != nil {
18                 log.Fatal(err)
19         }
20         return r
21 }
22
23 func setupDirectories(tmpdir string) (outdir string, err error) {
24         err = os.Chdir(tmpdir)
25         if err != nil {
26                 return "", err
27         }
28
29         err = os.Mkdir("tmpdir", 0700)
30         if err != nil {
31                 return "", err
32         }
33
34         err = os.Mkdir("outdir", 0700)
35         if err != nil {
36                 return "", err
37         }
38
39         os.Chdir("outdir")
40         if err != nil {
41                 return "", err
42         }
43
44         outdir, err = os.Getwd()
45         if err != nil {
46                 return "", err
47         }
48
49         return outdir, nil
50 }
51
52 func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
53         var err error
54
55         if taskp["task.vwd"] != nil {
56                 // Set up VWD symlinks in outdir
57                 // TODO
58         }
59
60         if taskp["task.stdin"] != nil {
61                 stdin, ok := taskp["task.stdin"].(string)
62                 if !ok {
63                         return errors.New("Could not cast task.stdin to string")
64                 }
65                 // Set up stdin redirection
66                 cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
67                 if err != nil {
68                         log.Fatal(err)
69                 }
70         }
71
72         if taskp["task.stdout"] != nil {
73                 stdout, ok := taskp["task.stdout"].(string)
74                 if !ok {
75                         return errors.New("Could not cast task.stdout to string")
76                 }
77
78                 // Set up stdout redirection
79                 cmd.Stdout, err = os.Open(outdir + "/" + stdout)
80                 if err != nil {
81                         log.Fatal(err)
82                 }
83         } else {
84                 cmd.Stdout = os.Stdout
85         }
86
87         if taskp["task.env"] != nil {
88                 taskenv, ok := taskp["task.env"].(map[string]interface{})
89                 if !ok {
90                         return errors.New("Could not cast task.env to map")
91                 }
92
93                 // Set up subprocess environment
94                 cmd.Env = os.Environ()
95                 for k, v := range taskenv {
96                         var vstr string
97                         vstr, ok = v.(string)
98                         if !ok {
99                                 return errors.New("Could not cast environment value to string")
100                         }
101                         cmd.Env = append(cmd.Env, k+"="+vstr)
102                 }
103         }
104         return nil
105 }
106
107 func setupSignals(cmd *exec.Cmd) {
108         // Set up signal handlers
109         // Forward SIGINT, SIGTERM and SIGQUIT to inner process
110         sigChan := make(chan os.Signal, 1)
111         go func(sig <-chan os.Signal) {
112                 catch := <-sig
113                 if cmd.Process != nil {
114                         cmd.Process.Signal(catch)
115                 }
116         }(sigChan)
117         signal.Notify(sigChan, syscall.SIGTERM)
118         signal.Notify(sigChan, syscall.SIGINT)
119         signal.Notify(sigChan, syscall.SIGQUIT)
120 }
121
122 func inCodes(code int, codes interface{}) bool {
123         if codes != nil {
124                 codesArray, ok := codes.([]interface{})
125                 if !ok {
126                         return false
127                 }
128                 for _, c := range codesArray {
129                         var num float64
130                         num, ok = c.(float64)
131                         if ok && code == int(num) {
132                                 return true
133                         }
134                 }
135         }
136         return false
137 }
138
139 const TASK_TEMPFAIL = 111
140
141 type TempFail struct{ InnerError error }
142 type PermFail struct{}
143
144 func (s TempFail) Error() string {
145         return s.InnerError.Error()
146 }
147
148 func (s PermFail) Error() string {
149         return "PermFail"
150 }
151
152 func runner(api arvadosclient.ArvadosClient,
153         jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
154         taskStruct arvadosclient.Dict) error {
155
156         var err error
157         var ok bool
158         var jobp, taskp map[string]interface{}
159         jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
160         if !ok {
161                 return errors.New("Could not cast job script_parameters to map")
162         }
163
164         taskp, ok = taskStruct["parameters"].(map[string]interface{})
165         if !ok {
166                 return errors.New("Could not cast task parameters to map")
167         }
168
169         // If this is task 0 and there are multiple tasks, dispatch subtasks
170         // and exit.
171         if taskStruct["sequence"] == 0.0 {
172                 var tasks []interface{}
173                 tasks, ok = jobp["tasks"].([]interface{})
174                 if !ok {
175                         return errors.New("Could not cast tasks to array")
176                 }
177
178                 if len(tasks) == 1 {
179                         taskp = tasks[0].(map[string]interface{})
180                 } else {
181                         for task := range tasks {
182                                 err := api.Call("POST", "job_tasks", "", "",
183                                         arvadosclient.Dict{
184                                                 "job_uuid":                 jobUuid,
185                                                 "created_by_job_task_uuid": "",
186                                                 "sequence":                 1,
187                                                 "parameters":               task},
188                                         nil)
189                                 if err != nil {
190                                         return TempFail{err}
191                                 }
192                         }
193                         err = api.Call("PUT", "job_tasks", taskUuid, "",
194                                 arvadosclient.Dict{
195                                         "job_task": arvadosclient.Dict{
196                                                 "output":   "",
197                                                 "success":  true,
198                                                 "progress": 1.0}},
199                                 nil)
200                         return nil
201                 }
202         }
203
204         // Set up subprocess
205         var commandline []string
206         var commandsarray []interface{}
207
208         commandsarray, ok = taskp["command"].([]interface{})
209         if !ok {
210                 return errors.New("Could not cast commands to array")
211         }
212
213         for _, c := range commandsarray {
214                 var cstr string
215                 cstr, ok = c.(string)
216                 if !ok {
217                         return errors.New("Could not cast command argument to string")
218                 }
219                 commandline = append(commandline, cstr)
220         }
221         cmd := exec.Command(commandline[0], commandline[1:]...)
222
223         var outdir string
224         outdir, err = setupDirectories(tmpdir)
225         if err != nil {
226                 return TempFail{err}
227         }
228
229         cmd.Dir = outdir
230
231         err = setupCommand(cmd, taskp, keepmount, outdir)
232         if err != nil {
233                 return err
234         }
235
236         setupSignals(cmd)
237
238         // Run subprocess and wait for it to complete
239         log.Printf("Running %v", cmd.Args)
240
241         err = cmd.Run()
242
243         if err != nil {
244                 return TempFail{err}
245         }
246
247         const success = 1
248         const permfail = 2
249         const tempfail = 2
250         var status int
251
252         exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
253
254         if inCodes(exitCode, taskp["task.successCodes"]) {
255                 status = success
256         } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
257                 status = permfail
258         } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
259                 os.Exit(TASK_TEMPFAIL)
260         } else if cmd.ProcessState.Success() {
261                 status = success
262         } else {
263                 status = permfail
264         }
265
266         // Upload output directory
267         // TODO
268
269         // Set status
270         err = api.Call("PUT", "job_tasks", taskUuid, "",
271                 arvadosclient.Dict{
272                         "job_task": arvadosclient.Dict{
273                                 "output":   "",
274                                 "success":  status == success,
275                                 "progress": 1.0}},
276                 nil)
277         if err != nil {
278                 return TempFail{err}
279         }
280
281         if status == success {
282                 return nil
283         } else {
284                 return PermFail{}
285         }
286 }
287
288 func main() {
289         syscall.Umask(0077)
290
291         api, err := arvadosclient.MakeArvadosClient()
292         if err != nil {
293                 log.Fatal(err)
294         }
295
296         jobUuid := os.Getenv("JOB_UUID")
297         taskUuid := os.Getenv("TASK_UUID")
298         tmpdir := os.Getenv("TASK_WORK")
299         keepmount := os.Getenv("TASK_KEEPMOUNT")
300
301         jobStruct := getRecord(api, "jobs", jobUuid)
302         taskStruct := getRecord(api, "job_tasks", taskUuid)
303
304         err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
305
306         if err == nil {
307                 os.Exit(0)
308         } else if _, ok := err.(TempFail); ok {
309                 log.Print(err)
310                 os.Exit(TASK_TEMPFAIL)
311         } else if _, ok := err.(PermFail); ok {
312                 os.Exit(1)
313         } else {
314                 log.Fatal(err)
315         }
316 }