7 "git.curoverse.com/arvados.git/sdk/go/arvados"
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 Command []string `json:"command"`
23 Env map[string]string `json:"task.env"`
24 Stdin string `json:"task.stdin"`
25 Stdout string `json:"task.stdout"`
26 Stderr string `json:"task.stderr"`
27 Vwd map[string]string `json:"task.vwd"`
28 SuccessCodes []int `json:"task.successCodes"`
29 PermanentFailCodes []int `json:"task.permanentFailCodes"`
30 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
31 KeepTmpOutput bool `json:"task.keepTmpOutput"`
35 Tasks []TaskDef `json:"tasks"`
39 Script_parameters Tasks `json:"script_parameters"`
43 Job_uuid string `json:"job_uuid"`
44 Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
45 Parameters TaskDef `json:"parameters"`
46 Sequence int `json:"sequence"`
47 Output string `json:"output"`
48 Success bool `json:"success"`
49 Progress float32 `json:"sequence"`
52 type IArvadosClient interface {
53 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
54 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
57 func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
58 tmpdir = crunchtmpdir + "/tmpdir"
59 err = os.Mkdir(tmpdir, 0700)
65 outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
67 outdir = crunchtmpdir + "/outdir"
68 err = os.Mkdir(outdir, 0700)
74 return tmpdir, outdir, nil
77 func checkOutputFilename(outdir, fn string) error {
78 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
79 return fmt.Errorf("Path must not start or end with '/'")
81 if strings.Index("../", fn) != -1 {
82 return fmt.Errorf("Path must not contain '../'")
85 sl := strings.LastIndex(fn, "/")
87 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
92 func copyFile(dst, src string) error {
93 in, err := os.Open(src)
99 out, err := os.Create(dst)
105 _, err = io.Copy(out, in)
109 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
110 if taskp.Vwd != nil {
111 for k, v := range taskp.Vwd {
112 v = substitute(v, replacements)
113 err = checkOutputFilename(outdir, k)
115 return "", "", "", err
117 if taskp.KeepTmpOutput {
118 err = copyFile(v, outdir+"/"+k)
120 err = os.Symlink(v, outdir+"/"+k)
123 return "", "", "", err
128 if taskp.Stdin != "" {
129 // Set up stdin redirection
130 stdin = substitute(taskp.Stdin, replacements)
131 cmd.Stdin, err = os.Open(stdin)
133 return "", "", "", err
137 if taskp.Stdout != "" {
138 err = checkOutputFilename(outdir, taskp.Stdout)
140 return "", "", "", err
142 // Set up stdout redirection
143 stdout = outdir + "/" + taskp.Stdout
144 cmd.Stdout, err = os.Create(stdout)
146 return "", "", "", err
149 cmd.Stdout = os.Stdout
152 if taskp.Stderr != "" {
153 err = checkOutputFilename(outdir, taskp.Stderr)
155 return "", "", "", err
157 // Set up stderr redirection
158 stderr = outdir + "/" + taskp.Stderr
159 cmd.Stderr, err = os.Create(stderr)
161 return "", "", "", err
164 cmd.Stderr = os.Stderr
167 if taskp.Env != nil {
168 // Set up subprocess environment
169 cmd.Env = os.Environ()
170 for k, v := range taskp.Env {
171 v = substitute(v, replacements)
172 cmd.Env = append(cmd.Env, k+"="+v)
175 return stdin, stdout, stderr, nil
178 // Set up signal handlers. Go sends signal notifications to a "signal
180 func setupSignals(cmd *exec.Cmd) chan os.Signal {
181 sigChan := make(chan os.Signal, 1)
182 signal.Notify(sigChan, syscall.SIGTERM)
183 signal.Notify(sigChan, syscall.SIGINT)
184 signal.Notify(sigChan, syscall.SIGQUIT)
188 func inCodes(code int, codes []int) bool {
190 for _, c := range codes {
199 const TASK_TEMPFAIL = 111
201 type TempFail struct{ error }
202 type PermFail struct{}
204 func (s PermFail) Error() string {
208 func substitute(inp string, subst map[string]string) string {
209 for k, v := range subst {
210 inp = strings.Replace(inp, k, v, -1)
215 func getKeepTmp(outdir string) (manifest string, err error) {
216 fn, err := os.Open(outdir + "/" + ".arvados#collection")
222 buf, err := ioutil.ReadAll(fn)
226 collection := arvados.Collection{}
227 err = json.Unmarshal(buf, &collection)
228 return collection.ManifestText, err
231 func runner(api IArvadosClient,
233 jobUuid, taskUuid, crunchtmpdir, keepmount string,
234 jobStruct Job, taskStruct Task) error {
237 taskp := taskStruct.Parameters
239 // If this is task 0 and there are multiple tasks, dispatch subtasks
241 if taskStruct.Sequence == 0 {
242 if len(jobStruct.Script_parameters.Tasks) == 1 {
243 taskp = jobStruct.Script_parameters.Tasks[0]
245 for _, task := range jobStruct.Script_parameters.Tasks {
246 err := api.Create("job_tasks",
247 map[string]interface{}{
248 "job_task": Task{Job_uuid: jobUuid,
249 Created_by_job_task_uuid: taskUuid,
257 err = api.Update("job_tasks", taskUuid,
258 map[string]interface{}{
268 var tmpdir, outdir string
269 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
274 replacements := map[string]string{
275 "$(task.tmpdir)": tmpdir,
276 "$(task.outdir)": outdir,
277 "$(task.keep)": keepmount}
279 log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
280 log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
281 log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
284 for k, v := range taskp.Command {
285 taskp.Command[k] = substitute(v, replacements)
288 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
292 var stdin, stdout, stderr string
293 stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
298 // Run subprocess and wait for it to complete
300 stdin = " < " + stdin
303 stdout = " > " + stdout
306 stderr = " 2> " + stderr
308 log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
310 var caughtSignal os.Signal
311 sigChan := setupSignals(cmd)
319 finishedSignalNotify := make(chan struct{})
320 go func(sig <-chan os.Signal) {
321 for sig := range sig {
323 cmd.Process.Signal(caughtSignal)
325 close(finishedSignalNotify)
332 <-finishedSignalNotify
334 if caughtSignal != nil {
335 log.Printf("Caught signal %v", caughtSignal)
340 // Run() returns ExitError on non-zero exit code, but we handle
341 // that down below. So only return if it's not ExitError.
342 if _, ok := err.(*exec.ExitError); !ok {
349 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
351 log.Printf("Completed with exit code %v", exitCode)
353 if inCodes(exitCode, taskp.PermanentFailCodes) {
355 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
356 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
357 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
363 // Upload output directory
365 if taskp.KeepTmpOutput {
366 manifest, err = getKeepTmp(outdir)
368 manifest, err = WriteTree(kc, outdir)
375 err = api.Update("job_tasks", taskUuid,
376 map[string]interface{}{
394 api, err := arvadosclient.MakeArvadosClient()
399 // Container may not have certificates installed, so need to look for
400 // /etc/arvados/ca-certificates.crt in addition to normal system certs.
401 var certFiles = []string{
402 "/etc/ssl/certs/ca-certificates.crt", // Debian
403 "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat
404 "/etc/arvados/ca-certificates.crt",
407 certs := x509.NewCertPool()
408 for _, file := range certFiles {
409 data, err := ioutil.ReadFile(file)
411 log.Printf("Using TLS certificates at %v", file)
412 certs.AppendCertsFromPEM(data)
415 api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
417 jobUuid := os.Getenv("JOB_UUID")
418 taskUuid := os.Getenv("TASK_UUID")
419 tmpdir := os.Getenv("TASK_WORK")
420 keepmount := os.Getenv("TASK_KEEPMOUNT")
425 err = api.Get("jobs", jobUuid, nil, &jobStruct)
429 err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
435 kc, err = keepclient.MakeKeepClient(api)
441 err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
445 } else if _, ok := err.(TempFail); ok {
447 os.Exit(TASK_TEMPFAIL)
448 } else if _, ok := err.(PermFail); ok {