4 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5 //"git.curoverse.com/arvados.git/sdk/go/keepclient"
14 commands []string `json:"commands"`
15 env map[string]string `json:"task.env"`
16 stdin string `json:"task.stdin"`
17 stdout string `json:"task.stdout"`
18 vwd map[string]string `json:"task.vwd"`
19 successCodes []int `json:"task.successCodes"`
20 permanentFailCodes []int `json:"task.permanentFailCodes"`
21 temporaryFailCodes []int `json:"task.temporaryFailCodes"`
25 tasks []TaskDef `json:"script_parameters"`
29 script_parameters Tasks `json:"script_parameters"`
33 job_uuid string `json:"job_uuid"`
34 created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
35 parameters TaskDef `json:"parameters"`
36 sequence int `json:"sequence"`
37 output string `json:"output"`
38 success bool `json:"success"`
39 progress float32 `json:"sequence"`
42 func setupDirectories(tmpdir string) (outdir string, err error) {
43 err = os.Chdir(tmpdir)
48 err = os.Mkdir("tmpdir", 0700)
53 err = os.Mkdir("outdir", 0700)
63 outdir, err = os.Getwd()
71 func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error {
74 //if taskp.vwd != nil {
75 // Set up VWD symlinks in outdir
79 if taskp.stdin != "" {
80 // Set up stdin redirection
81 cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin)
87 if taskp.stdout != "" {
88 // Set up stdout redirection
89 cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout)
94 cmd.Stdout = os.Stdout
98 // Set up subprocess environment
99 cmd.Env = os.Environ()
100 for k, v := range taskp.env {
101 cmd.Env = append(cmd.Env, k+"="+v)
107 func setupSignals(cmd *exec.Cmd) {
108 // Set up signal handlers
109 // Forward SIGINT, SIGTERM and SIGQUIT to inner process
110 sigChan := make(chan os.Signal, 1)
111 go func(sig <-chan os.Signal) {
113 if cmd.Process != nil {
114 cmd.Process.Signal(catch)
117 signal.Notify(sigChan, syscall.SIGTERM)
118 signal.Notify(sigChan, syscall.SIGINT)
119 signal.Notify(sigChan, syscall.SIGQUIT)
122 func inCodes(code int, codes []int) bool {
124 for _, c := range codes {
133 const TASK_TEMPFAIL = 111
135 type TempFail struct{ InnerError error }
136 type PermFail struct{}
138 func (s TempFail) Error() string {
139 return s.InnerError.Error()
142 func (s PermFail) Error() string {
146 func runner(api arvadosclient.IArvadosClient,
147 jobUuid, taskUuid, tmpdir, keepmount string,
148 jobStruct Job, taskStruct Task) error {
151 taskp := taskStruct.parameters
153 // If this is task 0 and there are multiple tasks, dispatch subtasks
155 if taskStruct.sequence == 0 {
156 if len(jobStruct.script_parameters.tasks) == 1 {
157 taskp = jobStruct.script_parameters.tasks[0]
159 for _, task := range jobStruct.script_parameters.tasks {
160 err := api.Create("job_tasks",
161 map[string]interface{}{
162 "job_task": Task{job_uuid: jobUuid,
163 created_by_job_task_uuid: taskUuid,
171 err = api.Update("job_tasks", taskUuid,
172 map[string]interface{}{
183 cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...)
186 outdir, err = setupDirectories(tmpdir)
193 err = setupCommand(cmd, taskp, keepmount, outdir)
200 // Run subprocess and wait for it to complete
201 log.Printf("Running %v", cmd.Args)
214 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
216 if inCodes(exitCode, taskp.successCodes) {
218 } else if inCodes(exitCode, taskp.permanentFailCodes) {
220 } else if inCodes(exitCode, taskp.temporaryFailCodes) {
221 os.Exit(TASK_TEMPFAIL)
222 } else if cmd.ProcessState.Success() {
228 // Upload output directory
232 err = api.Update("job_tasks", taskUuid,
233 map[string]interface{}{
234 "job_task": map[string]interface{}{
236 "success": status == success,
243 if status == success {
253 api, err := arvadosclient.MakeArvadosClient()
258 jobUuid := os.Getenv("JOB_UUID")
259 taskUuid := os.Getenv("TASK_UUID")
260 tmpdir := os.Getenv("TASK_WORK")
261 keepmount := os.Getenv("TASK_KEEPMOUNT")
266 err = api.Get("jobs", jobUuid, nil, &jobStruct)
270 err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
275 err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
279 } else if _, ok := err.(TempFail); ok {
281 os.Exit(TASK_TEMPFAIL)
282 } else if _, ok := err.(PermFail); ok {