5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
6 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 Command []string `json:"command"`
17 Env map[string]string `json:"task.env"`
18 Stdin string `json:"task.stdin"`
19 Stdout string `json:"task.stdout"`
20 Vwd map[string]string `json:"task.vwd"`
21 SuccessCodes []int `json:"task.successCodes"`
22 PermanentFailCodes []int `json:"task.permanentFailCodes"`
23 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
27 Tasks []TaskDef `json:"tasks"`
31 Script_parameters Tasks `json:"script_parameters"`
35 Job_uuid string `json:"job_uuid"`
36 Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
37 Parameters TaskDef `json:"parameters"`
38 Sequence int `json:"sequence"`
39 Output string `json:"output"`
40 Success bool `json:"success"`
41 Progress float32 `json:"sequence"`
44 type IArvadosClient interface {
45 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
46 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
49 func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
50 tmpdir = crunchtmpdir + "/tmpdir"
51 err = os.Mkdir(tmpdir, 0700)
56 outdir = crunchtmpdir + "/outdir"
57 err = os.Mkdir(outdir, 0700)
62 return tmpdir, outdir, nil
65 func checkOutputFilename(outdir, fn string) error {
66 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
67 return fmt.Errorf("Path must not start or end with '/'")
69 if strings.Index("../", fn) != -1 {
70 return fmt.Errorf("Path must not contain '../'")
73 sl := strings.LastIndex(fn, "/")
75 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
80 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
82 for k, v := range taskp.Vwd {
83 v = substitute(v, replacements)
84 err = checkOutputFilename(outdir, k)
88 os.Symlink(v, outdir+"/"+k)
92 if taskp.Stdin != "" {
93 // Set up stdin redirection
94 stdin = substitute(taskp.Stdin, replacements)
95 cmd.Stdin, err = os.Open(stdin)
101 if taskp.Stdout != "" {
102 err = checkOutputFilename(outdir, taskp.Stdout)
106 // Set up stdout redirection
107 stdout = outdir + "/" + taskp.Stdout
108 cmd.Stdout, err = os.Create(stdout)
113 cmd.Stdout = os.Stdout
116 if taskp.Env != nil {
117 // Set up subprocess environment
118 cmd.Env = os.Environ()
119 for k, v := range taskp.Env {
120 v = substitute(v, replacements)
121 cmd.Env = append(cmd.Env, k+"="+v)
124 return stdin, stdout, nil
127 // Set up signal handlers. Go sends signal notifications to a "signal
129 func setupSignals(cmd *exec.Cmd) chan os.Signal {
130 sigChan := make(chan os.Signal, 1)
131 signal.Notify(sigChan, syscall.SIGTERM)
132 signal.Notify(sigChan, syscall.SIGINT)
133 signal.Notify(sigChan, syscall.SIGQUIT)
137 func inCodes(code int, codes []int) bool {
139 for _, c := range codes {
148 const TASK_TEMPFAIL = 111
150 type TempFail struct{ error }
151 type PermFail struct{}
153 func (s PermFail) Error() string {
157 func substitute(inp string, subst map[string]string) string {
158 for k, v := range subst {
159 inp = strings.Replace(inp, k, v, -1)
164 func runner(api IArvadosClient,
166 jobUuid, taskUuid, crunchtmpdir, keepmount string,
167 jobStruct Job, taskStruct Task) error {
170 taskp := taskStruct.Parameters
172 // If this is task 0 and there are multiple tasks, dispatch subtasks
174 if taskStruct.Sequence == 0 {
175 if len(jobStruct.Script_parameters.Tasks) == 1 {
176 taskp = jobStruct.Script_parameters.Tasks[0]
178 for _, task := range jobStruct.Script_parameters.Tasks {
179 err := api.Create("job_tasks",
180 map[string]interface{}{
181 "job_task": Task{Job_uuid: jobUuid,
182 Created_by_job_task_uuid: taskUuid,
190 err = api.Update("job_tasks", taskUuid,
191 map[string]interface{}{
201 var tmpdir, outdir string
202 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
207 replacements := map[string]string{
208 "$(task.tmpdir)": tmpdir,
209 "$(task.outdir)": outdir,
210 "$(task.keep)": keepmount}
213 for k, v := range taskp.Command {
214 taskp.Command[k] = substitute(v, replacements)
217 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
221 var stdin, stdout string
222 stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
227 // Run subprocess and wait for it to complete
229 stdin = " < " + stdin
232 stdout = " > " + stdout
234 log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
236 var caughtSignal os.Signal
237 sigChan := setupSignals(cmd)
245 finishedSignalNotify := make(chan struct{})
246 go func(sig <-chan os.Signal) {
247 for sig := range sig {
249 cmd.Process.Signal(caughtSignal)
251 close(finishedSignalNotify)
258 <-finishedSignalNotify
260 if caughtSignal != nil {
261 log.Printf("Caught signal %v", caughtSignal)
266 // Run() returns ExitError on non-zero exit code, but we handle
267 // that down below. So only return if it's not ExitError.
268 if _, ok := err.(*exec.ExitError); !ok {
275 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
277 log.Printf("Completed with exit code %v", exitCode)
279 if inCodes(exitCode, taskp.PermanentFailCodes) {
281 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
282 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
283 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
289 // Upload output directory
290 manifest, err := WriteTree(kc, outdir)
296 err = api.Update("job_tasks", taskUuid,
297 map[string]interface{}{
315 api, err := arvadosclient.MakeArvadosClient()
320 jobUuid := os.Getenv("JOB_UUID")
321 taskUuid := os.Getenv("TASK_UUID")
322 tmpdir := os.Getenv("TASK_WORK")
323 keepmount := os.Getenv("TASK_KEEPMOUNT")
328 err = api.Get("jobs", jobUuid, nil, &jobStruct)
332 err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
338 kc, err = keepclient.MakeKeepClient(&api)
344 err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
348 } else if _, ok := err.(TempFail); ok {
350 os.Exit(TASK_TEMPFAIL)
351 } else if _, ok := err.(PermFail); ok {