15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 Command []string `json:"command"`
22 Env map[string]string `json:"task.env"`
23 Stdin string `json:"task.stdin"`
24 Stdout string `json:"task.stdout"`
25 Stderr string `json:"task.stderr"`
26 Vwd map[string]string `json:"task.vwd"`
27 SuccessCodes []int `json:"task.successCodes"`
28 PermanentFailCodes []int `json:"task.permanentFailCodes"`
29 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
30 KeepTmpOutput bool `json:"task.keepTmpOutput"`
34 Tasks []TaskDef `json:"tasks"`
38 ScriptParameters Tasks `json:"script_parameters"`
42 JobUUID string `json:"job_uuid"`
43 CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"`
44 Parameters TaskDef `json:"parameters"`
45 Sequence int `json:"sequence"`
46 Output string `json:"output"`
47 Success bool `json:"success"`
48 Progress float32 `json:"sequence"`
51 type IArvadosClient interface {
52 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
53 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
56 func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
57 tmpdir = crunchtmpdir + "/tmpdir"
58 err = os.Mkdir(tmpdir, 0700)
64 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
66 outdir = crunchtmpdir + "/outdir"
67 err = os.Mkdir(outdir, 0700)
73 return tmpdir, outdir, nil
76 func checkOutputFilename(outdir, fn string) error {
77 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
78 return fmt.Errorf("Path must not start or end with '/'")
80 if strings.Index("../", fn) != -1 {
81 return fmt.Errorf("Path must not contain '../'")
84 sl := strings.LastIndex(fn, "/")
86 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
91 func copyFile(dst, src string) error {
92 in, err := os.Open(src)
98 out, err := os.Create(dst)
104 _, err = io.Copy(out, in)
108 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
109 if taskp.Vwd != nil {
110 for k, v := range taskp.Vwd {
111 v = substitute(v, replacements)
112 err = checkOutputFilename(outdir, k)
114 return "", "", "", err
116 if taskp.KeepTmpOutput {
117 err = copyFile(v, outdir+"/"+k)
119 err = os.Symlink(v, outdir+"/"+k)
122 return "", "", "", err
127 if taskp.Stdin != "" {
128 // Set up stdin redirection
129 stdin = substitute(taskp.Stdin, replacements)
130 cmd.Stdin, err = os.Open(stdin)
132 return "", "", "", err
136 if taskp.Stdout != "" {
137 err = checkOutputFilename(outdir, taskp.Stdout)
139 return "", "", "", err
141 // Set up stdout redirection
142 stdout = outdir + "/" + taskp.Stdout
143 cmd.Stdout, err = os.Create(stdout)
145 return "", "", "", err
148 cmd.Stdout = os.Stdout
151 if taskp.Stderr != "" {
152 err = checkOutputFilename(outdir, taskp.Stderr)
154 return "", "", "", err
156 // Set up stderr redirection
157 stderr = outdir + "/" + taskp.Stderr
158 cmd.Stderr, err = os.Create(stderr)
160 return "", "", "", err
163 cmd.Stderr = os.Stderr
166 if taskp.Env != nil {
167 // Set up subprocess environment
168 cmd.Env = os.Environ()
169 for k, v := range taskp.Env {
170 v = substitute(v, replacements)
171 cmd.Env = append(cmd.Env, k+"="+v)
174 return stdin, stdout, stderr, nil
177 // Set up signal handlers. Go sends signal notifications to a "signal
179 func setupSignals(cmd *exec.Cmd) chan os.Signal {
180 sigChan := make(chan os.Signal, 1)
181 signal.Notify(sigChan, syscall.SIGTERM)
182 signal.Notify(sigChan, syscall.SIGINT)
183 signal.Notify(sigChan, syscall.SIGQUIT)
187 func inCodes(code int, codes []int) bool {
189 for _, c := range codes {
198 const TASK_TEMPFAIL = 111
200 type TempFail struct{ error }
201 type PermFail struct{}
203 func (s PermFail) Error() string {
207 func substitute(inp string, subst map[string]string) string {
208 for k, v := range subst {
209 inp = strings.Replace(inp, k, v, -1)
214 func getKeepTmp(outdir string) (manifest string, err error) {
215 fn, err := os.Open(outdir + "/" + ".arvados#collection")
221 buf, err := ioutil.ReadAll(fn)
225 collection := arvados.Collection{}
226 err = json.Unmarshal(buf, &collection)
227 return collection.ManifestText, err
230 func runner(api IArvadosClient,
232 jobUUID, taskUUID, crunchtmpdir, keepmount string,
233 jobStruct Job, taskStruct Task) error {
236 taskp := taskStruct.Parameters
238 // If this is task 0 and there are multiple tasks, dispatch subtasks
240 if taskStruct.Sequence == 0 {
241 if len(jobStruct.ScriptParameters.Tasks) == 1 {
242 taskp = jobStruct.ScriptParameters.Tasks[0]
244 for _, task := range jobStruct.ScriptParameters.Tasks {
245 err := api.Create("job_tasks",
246 map[string]interface{}{
249 CreatedByJobTaskUUID: taskUUID,
257 err = api.Update("job_tasks", taskUUID,
258 map[string]interface{}{
259 "job_task": map[string]interface{}{
268 var tmpdir, outdir string
269 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
274 replacements := map[string]string{
275 "$(task.tmpdir)": tmpdir,
276 "$(task.outdir)": outdir,
277 "$(task.keep)": keepmount}
279 log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
280 log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
281 log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
284 for k, v := range taskp.Command {
285 taskp.Command[k] = substitute(v, replacements)
288 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
292 var stdin, stdout, stderr string
293 stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
298 // Run subprocess and wait for it to complete
300 stdin = " < " + stdin
303 stdout = " > " + stdout
306 stderr = " 2> " + stderr
308 log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
310 var caughtSignal os.Signal
311 sigChan := setupSignals(cmd)
319 finishedSignalNotify := make(chan struct{})
320 go func(sig <-chan os.Signal) {
321 for sig := range sig {
323 cmd.Process.Signal(caughtSignal)
325 close(finishedSignalNotify)
332 <-finishedSignalNotify
334 if caughtSignal != nil {
335 log.Printf("Caught signal %v", caughtSignal)
340 // Run() returns ExitError on non-zero exit code, but we handle
341 // that down below. So only return if it's not ExitError.
342 if _, ok := err.(*exec.ExitError); !ok {
349 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
351 log.Printf("Completed with exit code %v", exitCode)
353 if inCodes(exitCode, taskp.PermanentFailCodes) {
355 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
356 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
357 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
363 // Upload output directory
365 if taskp.KeepTmpOutput {
366 manifest, err = getKeepTmp(outdir)
368 manifest, err = WriteTree(kc, outdir)
375 err = api.Update("job_tasks", taskUUID,
376 map[string]interface{}{
394 api, err := arvadosclient.MakeArvadosClient()
399 jobUUID := os.Getenv("JOB_UUID")
400 taskUUID := os.Getenv("TASK_UUID")
401 tmpdir := os.Getenv("TASK_WORK")
402 keepmount := os.Getenv("TASK_KEEPMOUNT")
407 err = api.Get("jobs", jobUUID, nil, &jobStruct)
411 err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
417 kc, err = keepclient.MakeKeepClient(api)
423 err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
427 } else if _, ok := err.(TempFail); ok {
429 os.Exit(TASK_TEMPFAIL)
430 } else if _, ok := err.(PermFail); ok {