4 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5 //"git.curoverse.com/arvados.git/sdk/go/keepclient"
14 func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
15 r = make(arvadosclient.Dict)
16 err := api.Get(rsc, uuid, nil, &r)
23 func setupDirectories(tmpdir string) (outdir string, err error) {
24 err = os.Chdir(tmpdir)
29 err = os.Mkdir("tmpdir", 0700)
34 err = os.Mkdir("outdir", 0700)
44 outdir, err = os.Getwd()
52 func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
55 if taskp["task.vwd"] != nil {
56 // Set up VWD symlinks in outdir
60 if taskp["task.stdin"] != nil {
61 stdin, ok := taskp["task.stdin"].(string)
63 return errors.New("Could not cast task.stdin to string")
65 // Set up stdin redirection
66 cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
72 if taskp["task.stdout"] != nil {
73 stdout, ok := taskp["task.stdout"].(string)
75 return errors.New("Could not cast task.stdout to string")
78 // Set up stdout redirection
79 cmd.Stdout, err = os.Open(outdir + "/" + stdout)
84 cmd.Stdout = os.Stdout
87 if taskp["task.env"] != nil {
88 taskenv, ok := taskp["task.env"].(map[string]interface{})
90 return errors.New("Could not cast task.env to map")
93 // Set up subprocess environment
94 cmd.Env = os.Environ()
95 for k, v := range taskenv {
99 return errors.New("Could not cast environment value to string")
101 cmd.Env = append(cmd.Env, k+"="+vstr)
107 func setupSignals(cmd *exec.Cmd) {
108 // Set up signal handlers
109 // Forward SIGINT, SIGTERM and SIGQUIT to inner process
110 sigChan := make(chan os.Signal, 1)
111 go func(sig <-chan os.Signal) {
113 if cmd.Process != nil {
114 cmd.Process.Signal(catch)
117 signal.Notify(sigChan, syscall.SIGTERM)
118 signal.Notify(sigChan, syscall.SIGINT)
119 signal.Notify(sigChan, syscall.SIGQUIT)
122 func inCodes(code int, codes interface{}) bool {
124 codesArray, ok := codes.([]interface{})
128 for _, c := range codesArray {
130 num, ok = c.(float64)
131 if ok && code == int(num) {
139 const TASK_TEMPFAIL = 111
141 type TempFail struct{ InnerError error }
142 type PermFail struct{}
144 func (s TempFail) Error() string {
145 return s.InnerError.Error()
148 func (s PermFail) Error() string {
152 func runner(api arvadosclient.ArvadosClient,
153 jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
154 taskStruct arvadosclient.Dict) error {
158 var jobp, taskp map[string]interface{}
159 jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
161 return errors.New("Could not cast job script_parameters to map")
164 taskp, ok = taskStruct["parameters"].(map[string]interface{})
166 return errors.New("Could not cast task parameters to map")
169 // If this is task 0 and there are multiple tasks, dispatch subtasks
171 if taskStruct["sequence"] == 0.0 {
172 var tasks []interface{}
173 tasks, ok = jobp["tasks"].([]interface{})
175 return errors.New("Could not cast tasks to array")
179 taskp = tasks[0].(map[string]interface{})
181 for task := range tasks {
182 err := api.Call("POST", "job_tasks", "", "",
185 "created_by_job_task_uuid": "",
193 err = api.Call("PUT", "job_tasks", taskUuid, "",
195 "job_task": arvadosclient.Dict{
205 var commandline []string
206 var commandsarray []interface{}
208 commandsarray, ok = taskp["command"].([]interface{})
210 return errors.New("Could not cast commands to array")
213 for _, c := range commandsarray {
215 cstr, ok = c.(string)
217 return errors.New("Could not cast command argument to string")
219 commandline = append(commandline, cstr)
221 cmd := exec.Command(commandline[0], commandline[1:]...)
224 outdir, err = setupDirectories(tmpdir)
231 err = setupCommand(cmd, taskp, keepmount, outdir)
238 // Run subprocess and wait for it to complete
239 log.Printf("Running %v", cmd.Args)
252 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
254 if inCodes(exitCode, taskp["task.successCodes"]) {
256 } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
258 } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
259 os.Exit(TASK_TEMPFAIL)
260 } else if cmd.ProcessState.Success() {
266 // Upload output directory
270 err = api.Call("PUT", "job_tasks", taskUuid, "",
272 "job_task": arvadosclient.Dict{
274 "success": status == success,
281 if status == success {
291 api, err := arvadosclient.MakeArvadosClient()
296 jobUuid := os.Getenv("JOB_UUID")
297 taskUuid := os.Getenv("TASK_UUID")
298 tmpdir := os.Getenv("TASK_WORK")
299 keepmount := os.Getenv("TASK_KEEPMOUNT")
301 jobStruct := getRecord(api, "jobs", jobUuid)
302 taskStruct := getRecord(api, "job_tasks", taskUuid)
304 err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
308 } else if _, ok := err.(TempFail); ok {
310 os.Exit(TASK_TEMPFAIL)
311 } else if _, ok := err.(PermFail); ok {