6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
19 Command []string `json:"command"`
20 Env map[string]string `json:"task.env"`
21 Stdin string `json:"task.stdin"`
22 Stdout string `json:"task.stdout"`
23 Stderr string `json:"task.stderr"`
24 Vwd map[string]string `json:"task.vwd"`
25 SuccessCodes []int `json:"task.successCodes"`
26 PermanentFailCodes []int `json:"task.permanentFailCodes"`
27 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
31 Tasks []TaskDef `json:"tasks"`
35 Script_parameters Tasks `json:"script_parameters"`
39 Job_uuid string `json:"job_uuid"`
40 Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
41 Parameters TaskDef `json:"parameters"`
42 Sequence int `json:"sequence"`
43 Output string `json:"output"`
44 Success bool `json:"success"`
45 Progress float32 `json:"sequence"`
48 type IArvadosClient interface {
49 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
50 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
53 func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
54 tmpdir = crunchtmpdir + "/tmpdir"
55 err = os.Mkdir(tmpdir, 0700)
60 outdir = crunchtmpdir + "/outdir"
61 err = os.Mkdir(outdir, 0700)
66 return tmpdir, outdir, nil
69 func checkOutputFilename(outdir, fn string) error {
70 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
71 return fmt.Errorf("Path must not start or end with '/'")
73 if strings.Index("../", fn) != -1 {
74 return fmt.Errorf("Path must not contain '../'")
77 sl := strings.LastIndex(fn, "/")
79 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
84 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
86 for k, v := range taskp.Vwd {
87 v = substitute(v, replacements)
88 err = checkOutputFilename(outdir, k)
90 return "", "", "", err
92 os.Symlink(v, outdir+"/"+k)
96 if taskp.Stdin != "" {
97 // Set up stdin redirection
98 stdin = substitute(taskp.Stdin, replacements)
99 cmd.Stdin, err = os.Open(stdin)
101 return "", "", "", err
105 if taskp.Stdout != "" {
106 err = checkOutputFilename(outdir, taskp.Stdout)
108 return "", "", "", err
110 // Set up stdout redirection
111 stdout = outdir + "/" + taskp.Stdout
112 cmd.Stdout, err = os.Create(stdout)
114 return "", "", "", err
117 cmd.Stdout = os.Stdout
120 if taskp.Stderr != "" {
121 err = checkOutputFilename(outdir, taskp.Stderr)
123 return "", "", "", err
125 // Set up stderr redirection
126 stderr = outdir + "/" + taskp.Stderr
127 cmd.Stderr, err = os.Create(stderr)
129 return "", "", "", err
132 cmd.Stderr = os.Stderr
135 if taskp.Env != nil {
136 // Set up subprocess environment
137 cmd.Env = os.Environ()
138 for k, v := range taskp.Env {
139 v = substitute(v, replacements)
140 cmd.Env = append(cmd.Env, k+"="+v)
143 return stdin, stdout, stderr, nil
146 // Set up signal handlers. Go sends signal notifications to a "signal
148 func setupSignals(cmd *exec.Cmd) chan os.Signal {
149 sigChan := make(chan os.Signal, 1)
150 signal.Notify(sigChan, syscall.SIGTERM)
151 signal.Notify(sigChan, syscall.SIGINT)
152 signal.Notify(sigChan, syscall.SIGQUIT)
156 func inCodes(code int, codes []int) bool {
158 for _, c := range codes {
167 const TASK_TEMPFAIL = 111
169 type TempFail struct{ error }
170 type PermFail struct{}
172 func (s PermFail) Error() string {
176 func substitute(inp string, subst map[string]string) string {
177 for k, v := range subst {
178 inp = strings.Replace(inp, k, v, -1)
183 func runner(api IArvadosClient,
185 jobUuid, taskUuid, crunchtmpdir, keepmount string,
186 jobStruct Job, taskStruct Task) error {
189 taskp := taskStruct.Parameters
191 // If this is task 0 and there are multiple tasks, dispatch subtasks
193 if taskStruct.Sequence == 0 {
194 if len(jobStruct.Script_parameters.Tasks) == 1 {
195 taskp = jobStruct.Script_parameters.Tasks[0]
197 for _, task := range jobStruct.Script_parameters.Tasks {
198 err := api.Create("job_tasks",
199 map[string]interface{}{
200 "job_task": Task{Job_uuid: jobUuid,
201 Created_by_job_task_uuid: taskUuid,
209 err = api.Update("job_tasks", taskUuid,
210 map[string]interface{}{
220 var tmpdir, outdir string
221 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
226 replacements := map[string]string{
227 "$(task.tmpdir)": tmpdir,
228 "$(task.outdir)": outdir,
229 "$(task.keep)": keepmount}
231 log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
232 log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
233 log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
236 for k, v := range taskp.Command {
237 taskp.Command[k] = substitute(v, replacements)
240 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
244 var stdin, stdout, stderr string
245 stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
250 // Run subprocess and wait for it to complete
252 stdin = " < " + stdin
255 stdout = " > " + stdout
258 stderr = " 2> " + stderr
260 log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
262 var caughtSignal os.Signal
263 sigChan := setupSignals(cmd)
271 finishedSignalNotify := make(chan struct{})
272 go func(sig <-chan os.Signal) {
273 for sig := range sig {
275 cmd.Process.Signal(caughtSignal)
277 close(finishedSignalNotify)
284 <-finishedSignalNotify
286 if caughtSignal != nil {
287 log.Printf("Caught signal %v", caughtSignal)
292 // Run() returns ExitError on non-zero exit code, but we handle
293 // that down below. So only return if it's not ExitError.
294 if _, ok := err.(*exec.ExitError); !ok {
301 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
303 log.Printf("Completed with exit code %v", exitCode)
305 if inCodes(exitCode, taskp.PermanentFailCodes) {
307 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
308 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
309 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
315 // Upload output directory
316 manifest, err := WriteTree(kc, outdir)
322 err = api.Update("job_tasks", taskUuid,
323 map[string]interface{}{
341 api, err := arvadosclient.MakeArvadosClient()
346 // Container may not have certificates installed, so need to look for
347 // /etc/arvados/ca-certificates.crt in addition to normal system certs.
348 var certFiles = []string{
349 "/etc/ssl/certs/ca-certificates.crt", // Debian
350 "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat
351 "/etc/arvados/ca-certificates.crt",
354 certs := x509.NewCertPool()
355 for _, file := range certFiles {
356 data, err := ioutil.ReadFile(file)
358 log.Printf("Using TLS certificates at %v", file)
359 certs.AppendCertsFromPEM(data)
362 api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
364 jobUuid := os.Getenv("JOB_UUID")
365 taskUuid := os.Getenv("TASK_UUID")
366 tmpdir := os.Getenv("TASK_WORK")
367 keepmount := os.Getenv("TASK_KEEPMOUNT")
372 err = api.Get("jobs", jobUuid, nil, &jobStruct)
376 err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
382 kc, err = keepclient.MakeKeepClient(&api)
388 err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
392 } else if _, ok := err.(TempFail); ok {
394 os.Exit(TASK_TEMPFAIL)
395 } else if _, ok := err.(PermFail); ok {