1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21 "git.curoverse.com/arvados.git/sdk/go/keepclient"
25 Command []string `json:"command"`
26 Env map[string]string `json:"task.env"`
27 Stdin string `json:"task.stdin"`
28 Stdout string `json:"task.stdout"`
29 Stderr string `json:"task.stderr"`
30 Vwd map[string]string `json:"task.vwd"`
31 SuccessCodes []int `json:"task.successCodes"`
32 PermanentFailCodes []int `json:"task.permanentFailCodes"`
33 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
34 KeepTmpOutput bool `json:"task.keepTmpOutput"`
38 Tasks []TaskDef `json:"tasks"`
42 ScriptParameters Tasks `json:"script_parameters"`
46 JobUUID string `json:"job_uuid"`
47 CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"`
48 Parameters TaskDef `json:"parameters"`
49 Sequence int `json:"sequence"`
50 Output string `json:"output"`
51 Success bool `json:"success"`
52 Progress float32 `json:"sequence"`
55 type IArvadosClient interface {
56 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
57 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
60 func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
61 tmpdir = crunchtmpdir + "/tmpdir"
62 err = os.Mkdir(tmpdir, 0700)
68 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
70 outdir = crunchtmpdir + "/outdir"
71 err = os.Mkdir(outdir, 0700)
77 return tmpdir, outdir, nil
80 func checkOutputFilename(outdir, fn string) error {
81 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
82 return fmt.Errorf("Path must not start or end with '/'")
84 if strings.Index("../", fn) != -1 {
85 return fmt.Errorf("Path must not contain '../'")
88 sl := strings.LastIndex(fn, "/")
90 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
95 func copyFile(dst, src string) error {
96 in, err := os.Open(src)
102 out, err := os.Create(dst)
108 _, err = io.Copy(out, in)
112 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
113 if taskp.Vwd != nil {
114 for k, v := range taskp.Vwd {
115 v = substitute(v, replacements)
116 err = checkOutputFilename(outdir, k)
118 return "", "", "", err
120 if taskp.KeepTmpOutput {
121 err = copyFile(v, outdir+"/"+k)
123 err = os.Symlink(v, outdir+"/"+k)
126 return "", "", "", err
131 if taskp.Stdin != "" {
132 // Set up stdin redirection
133 stdin = substitute(taskp.Stdin, replacements)
134 cmd.Stdin, err = os.Open(stdin)
136 return "", "", "", err
140 if taskp.Stdout != "" {
141 err = checkOutputFilename(outdir, taskp.Stdout)
143 return "", "", "", err
145 // Set up stdout redirection
146 stdout = outdir + "/" + taskp.Stdout
147 cmd.Stdout, err = os.Create(stdout)
149 return "", "", "", err
152 cmd.Stdout = os.Stdout
155 if taskp.Stderr != "" {
156 err = checkOutputFilename(outdir, taskp.Stderr)
158 return "", "", "", err
160 // Set up stderr redirection
161 stderr = outdir + "/" + taskp.Stderr
162 cmd.Stderr, err = os.Create(stderr)
164 return "", "", "", err
167 cmd.Stderr = os.Stderr
170 if taskp.Env != nil {
171 // Set up subprocess environment
172 cmd.Env = os.Environ()
173 for k, v := range taskp.Env {
174 v = substitute(v, replacements)
175 cmd.Env = append(cmd.Env, k+"="+v)
178 return stdin, stdout, stderr, nil
181 // Set up signal handlers. Go sends signal notifications to a "signal
183 func setupSignals(cmd *exec.Cmd) chan os.Signal {
184 sigChan := make(chan os.Signal, 1)
185 signal.Notify(sigChan, syscall.SIGTERM)
186 signal.Notify(sigChan, syscall.SIGINT)
187 signal.Notify(sigChan, syscall.SIGQUIT)
191 func inCodes(code int, codes []int) bool {
193 for _, c := range codes {
202 const TASK_TEMPFAIL = 111
204 type TempFail struct{ error }
205 type PermFail struct{}
207 func (s PermFail) Error() string {
211 func substitute(inp string, subst map[string]string) string {
212 for k, v := range subst {
213 inp = strings.Replace(inp, k, v, -1)
218 func getKeepTmp(outdir string) (manifest string, err error) {
219 fn, err := os.Open(outdir + "/" + ".arvados#collection")
225 buf, err := ioutil.ReadAll(fn)
229 collection := arvados.Collection{}
230 err = json.Unmarshal(buf, &collection)
231 return collection.ManifestText, err
234 func runner(api IArvadosClient,
236 jobUUID, taskUUID, crunchtmpdir, keepmount string,
237 jobStruct Job, taskStruct Task) error {
240 taskp := taskStruct.Parameters
242 // If this is task 0 and there are multiple tasks, dispatch subtasks
244 if taskStruct.Sequence == 0 {
245 if len(jobStruct.ScriptParameters.Tasks) == 1 {
246 taskp = jobStruct.ScriptParameters.Tasks[0]
248 for _, task := range jobStruct.ScriptParameters.Tasks {
249 err := api.Create("job_tasks",
250 map[string]interface{}{
253 CreatedByJobTaskUUID: taskUUID,
261 err = api.Update("job_tasks", taskUUID,
262 map[string]interface{}{
263 "job_task": map[string]interface{}{
272 var tmpdir, outdir string
273 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
278 replacements := map[string]string{
279 "$(task.tmpdir)": tmpdir,
280 "$(task.outdir)": outdir,
281 "$(task.keep)": keepmount}
283 log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
284 log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
285 log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
288 for k, v := range taskp.Command {
289 taskp.Command[k] = substitute(v, replacements)
292 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
296 var stdin, stdout, stderr string
297 stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
302 // Run subprocess and wait for it to complete
304 stdin = " < " + stdin
307 stdout = " > " + stdout
310 stderr = " 2> " + stderr
312 log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
314 var caughtSignal os.Signal
315 sigChan := setupSignals(cmd)
323 finishedSignalNotify := make(chan struct{})
324 go func(sig <-chan os.Signal) {
325 for sig := range sig {
327 cmd.Process.Signal(caughtSignal)
329 close(finishedSignalNotify)
336 <-finishedSignalNotify
338 if caughtSignal != nil {
339 log.Printf("Caught signal %v", caughtSignal)
344 // Run() returns ExitError on non-zero exit code, but we handle
345 // that down below. So only return if it's not ExitError.
346 if _, ok := err.(*exec.ExitError); !ok {
353 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
355 log.Printf("Completed with exit code %v", exitCode)
357 if inCodes(exitCode, taskp.PermanentFailCodes) {
359 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
360 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
361 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
367 // Upload output directory
369 if taskp.KeepTmpOutput {
370 manifest, err = getKeepTmp(outdir)
372 manifest, err = WriteTree(kc, outdir)
379 err = api.Update("job_tasks", taskUUID,
380 map[string]interface{}{
398 api, err := arvadosclient.MakeArvadosClient()
403 jobUUID := os.Getenv("JOB_UUID")
404 taskUUID := os.Getenv("TASK_UUID")
405 tmpdir := os.Getenv("TASK_WORK")
406 keepmount := os.Getenv("TASK_KEEPMOUNT")
411 err = api.Get("jobs", jobUUID, nil, &jobStruct)
415 err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
421 kc, err = keepclient.MakeKeepClient(api)
427 err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
431 } else if _, ok := err.(TempFail); ok {
433 os.Exit(TASK_TEMPFAIL)
434 } else if _, ok := err.(PermFail); ok {