6 "git.curoverse.com/arvados.git/sdk/go/arvados"
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
20 Command []string `json:"command"`
21 Env map[string]string `json:"task.env"`
22 Stdin string `json:"task.stdin"`
23 Stdout string `json:"task.stdout"`
24 Stderr string `json:"task.stderr"`
25 Vwd map[string]string `json:"task.vwd"`
26 SuccessCodes []int `json:"task.successCodes"`
27 PermanentFailCodes []int `json:"task.permanentFailCodes"`
28 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
29 KeepTmpOutput bool `json:"task.keepTmpOutput"`
33 Tasks []TaskDef `json:"tasks"`
37 Script_parameters Tasks `json:"script_parameters"`
41 Job_uuid string `json:"job_uuid"`
42 Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
43 Parameters TaskDef `json:"parameters"`
44 Sequence int `json:"sequence"`
45 Output string `json:"output"`
46 Success bool `json:"success"`
47 Progress float32 `json:"sequence"`
50 type IArvadosClient interface {
51 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
52 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
55 func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
56 tmpdir = crunchtmpdir + "/tmpdir"
57 err = os.Mkdir(tmpdir, 0700)
63 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
65 outdir = crunchtmpdir + "/outdir"
66 err = os.Mkdir(outdir, 0700)
72 return tmpdir, outdir, nil
75 func checkOutputFilename(outdir, fn string) error {
76 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
77 return fmt.Errorf("Path must not start or end with '/'")
79 if strings.Index("../", fn) != -1 {
80 return fmt.Errorf("Path must not contain '../'")
83 sl := strings.LastIndex(fn, "/")
85 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
90 func copyFile(dst, src string) error {
91 in, err := os.Open(src)
97 out, err := os.Create(dst)
103 _, err = io.Copy(out, in)
107 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
108 if taskp.Vwd != nil {
109 for k, v := range taskp.Vwd {
110 v = substitute(v, replacements)
111 err = checkOutputFilename(outdir, k)
113 return "", "", "", err
115 if taskp.KeepTmpOutput {
116 err = copyFile(v, outdir+"/"+k)
118 err = os.Symlink(v, outdir+"/"+k)
121 return "", "", "", err
126 if taskp.Stdin != "" {
127 // Set up stdin redirection
128 stdin = substitute(taskp.Stdin, replacements)
129 cmd.Stdin, err = os.Open(stdin)
131 return "", "", "", err
135 if taskp.Stdout != "" {
136 err = checkOutputFilename(outdir, taskp.Stdout)
138 return "", "", "", err
140 // Set up stdout redirection
141 stdout = outdir + "/" + taskp.Stdout
142 cmd.Stdout, err = os.Create(stdout)
144 return "", "", "", err
147 cmd.Stdout = os.Stdout
150 if taskp.Stderr != "" {
151 err = checkOutputFilename(outdir, taskp.Stderr)
153 return "", "", "", err
155 // Set up stderr redirection
156 stderr = outdir + "/" + taskp.Stderr
157 cmd.Stderr, err = os.Create(stderr)
159 return "", "", "", err
162 cmd.Stderr = os.Stderr
165 if taskp.Env != nil {
166 // Set up subprocess environment
167 cmd.Env = os.Environ()
168 for k, v := range taskp.Env {
169 v = substitute(v, replacements)
170 cmd.Env = append(cmd.Env, k+"="+v)
173 return stdin, stdout, stderr, nil
176 // Set up signal handlers. Go sends signal notifications to a "signal
178 func setupSignals(cmd *exec.Cmd) chan os.Signal {
179 sigChan := make(chan os.Signal, 1)
180 signal.Notify(sigChan, syscall.SIGTERM)
181 signal.Notify(sigChan, syscall.SIGINT)
182 signal.Notify(sigChan, syscall.SIGQUIT)
186 func inCodes(code int, codes []int) bool {
188 for _, c := range codes {
197 const TASK_TEMPFAIL = 111
199 type TempFail struct{ error }
200 type PermFail struct{}
202 func (s PermFail) Error() string {
206 func substitute(inp string, subst map[string]string) string {
207 for k, v := range subst {
208 inp = strings.Replace(inp, k, v, -1)
213 func getKeepTmp(outdir string) (manifest string, err error) {
214 fn, err := os.Open(outdir + "/" + ".arvados#collection")
220 buf, err := ioutil.ReadAll(fn)
224 collection := arvados.Collection{}
225 err = json.Unmarshal(buf, &collection)
226 return collection.ManifestText, err
229 func runner(api IArvadosClient,
231 jobUuid, taskUuid, crunchtmpdir, keepmount string,
232 jobStruct Job, taskStruct Task) error {
235 taskp := taskStruct.Parameters
237 // If this is task 0 and there are multiple tasks, dispatch subtasks
239 if taskStruct.Sequence == 0 {
240 if len(jobStruct.Script_parameters.Tasks) == 1 {
241 taskp = jobStruct.Script_parameters.Tasks[0]
243 for _, task := range jobStruct.Script_parameters.Tasks {
244 err := api.Create("job_tasks",
245 map[string]interface{}{
246 "job_task": Task{Job_uuid: jobUuid,
247 Created_by_job_task_uuid: taskUuid,
255 err = api.Update("job_tasks", taskUuid,
256 map[string]interface{}{
266 var tmpdir, outdir string
267 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
272 replacements := map[string]string{
273 "$(task.tmpdir)": tmpdir,
274 "$(task.outdir)": outdir,
275 "$(task.keep)": keepmount}
277 log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
278 log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
279 log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
282 for k, v := range taskp.Command {
283 taskp.Command[k] = substitute(v, replacements)
286 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
290 var stdin, stdout, stderr string
291 stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
296 // Run subprocess and wait for it to complete
298 stdin = " < " + stdin
301 stdout = " > " + stdout
304 stderr = " 2> " + stderr
306 log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
308 var caughtSignal os.Signal
309 sigChan := setupSignals(cmd)
317 finishedSignalNotify := make(chan struct{})
318 go func(sig <-chan os.Signal) {
319 for sig := range sig {
321 cmd.Process.Signal(caughtSignal)
323 close(finishedSignalNotify)
330 <-finishedSignalNotify
332 if caughtSignal != nil {
333 log.Printf("Caught signal %v", caughtSignal)
338 // Run() returns ExitError on non-zero exit code, but we handle
339 // that down below. So only return if it's not ExitError.
340 if _, ok := err.(*exec.ExitError); !ok {
347 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
349 log.Printf("Completed with exit code %v", exitCode)
351 if inCodes(exitCode, taskp.PermanentFailCodes) {
353 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
354 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
355 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
361 // Upload output directory
363 if taskp.KeepTmpOutput {
364 manifest, err = getKeepTmp(outdir)
366 manifest, err = WriteTree(kc, outdir)
373 err = api.Update("job_tasks", taskUuid,
374 map[string]interface{}{
392 api, err := arvadosclient.MakeArvadosClient()
397 jobUuid := os.Getenv("JOB_UUID")
398 taskUuid := os.Getenv("TASK_UUID")
399 tmpdir := os.Getenv("TASK_WORK")
400 keepmount := os.Getenv("TASK_KEEPMOUNT")
405 err = api.Get("jobs", jobUuid, nil, &jobStruct)
409 err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
415 kc, err = keepclient.MakeKeepClient(api)
421 err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
425 } else if _, ok := err.(TempFail); ok {
427 os.Exit(TASK_TEMPFAIL)
428 } else if _, ok := err.(PermFail); ok {