X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/53b1b5bbe6d6ae007e8ce546ba2539e0c061e25a..419db47f0e97123cb3ff491d189b5607468101da:/sdk/go/crunchrunner/crunchrunner.go diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index d3f6fcbb43..ca16fc6561 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,44 +1,55 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package main import ( + "encoding/json" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io" + "io/ioutil" "log" "os" "os/exec" "os/signal" "strings" "syscall" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" ) type TaskDef struct { - command []string `json:"command"` - env map[string]string `json:"task.env"` - stdin string `json:"task.stdin"` - stdout string `json:"task.stdout"` - vwd map[string]string `json:"task.vwd"` - successCodes []int `json:"task.successCodes"` - permanentFailCodes []int `json:"task.permanentFailCodes"` - temporaryFailCodes []int `json:"task.temporaryFailCodes"` + Command []string `json:"command"` + Env map[string]string `json:"task.env"` + Stdin string `json:"task.stdin"` + Stdout string `json:"task.stdout"` + Stderr string `json:"task.stderr"` + Vwd map[string]string `json:"task.vwd"` + SuccessCodes []int `json:"task.successCodes"` + PermanentFailCodes []int `json:"task.permanentFailCodes"` + TemporaryFailCodes []int `json:"task.temporaryFailCodes"` + KeepTmpOutput bool `json:"task.keepTmpOutput"` } type Tasks struct { - tasks []TaskDef `json:"script_parameters"` + Tasks []TaskDef `json:"tasks"` } type Job struct { - script_parameters Tasks `json:"script_parameters"` + ScriptParameters Tasks `json:"script_parameters"` } type Task struct { - job_uuid string `json:"job_uuid"` - created_by_job_task_uuid string `json:"created_by_job_task_uuid"` - parameters TaskDef `json:"parameters"` - sequence int `json:"sequence"` - output string `json:"output"` - success bool `json:"success"` - progress float32 `json:"sequence"` + JobUUID string `json:"job_uuid"` + CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"` + Parameters TaskDef `json:"parameters"` + Sequence int `json:"sequence"` + Output string `json:"output"` + Success bool `json:"success"` + Progress float32 `json:"sequence"` } type IArvadosClient interface { @@ -46,17 +57,21 @@ type IArvadosClient interface { Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) } -func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) { +func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) { tmpdir = crunchtmpdir + "/tmpdir" err = os.Mkdir(tmpdir, 0700) if err != nil { return "", "", err } - outdir = crunchtmpdir + "/outdir" - err = os.Mkdir(outdir, 0700) - if err != nil { - return "", "", err + if keepTmp { + outdir = os.Getenv("TASK_KEEPMOUNT_TMP") + } else { + outdir = crunchtmpdir + "/outdir" + err = os.Mkdir(outdir, 0700) + if err != nil { + return "", "", err + } } return tmpdir, outdir, nil @@ -77,61 +92,96 @@ func checkOutputFilename(outdir, fn string) error { return nil } -func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) { - if taskp.vwd != nil { - for k, v := range taskp.vwd { +func copyFile(dst, src string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + return err +} + +func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) { + if taskp.Vwd != nil { + for k, v := range taskp.Vwd { v = substitute(v, replacements) err = checkOutputFilename(outdir, k) if err != nil { - return "", "", err + return "", "", "", err + } + if taskp.KeepTmpOutput { + err = copyFile(v, outdir+"/"+k) + } else { + err = os.Symlink(v, outdir+"/"+k) + } + if err != nil { + return "", "", "", err } - os.Symlink(v, outdir+"/"+k) } } - if taskp.stdin != "" { + if taskp.Stdin != "" { // Set up stdin redirection - stdin = substitute(taskp.stdin, replacements) + stdin = substitute(taskp.Stdin, replacements) cmd.Stdin, err = os.Open(stdin) if err != nil { - return "", "", err + return "", "", "", err } } - if taskp.stdout != "" { - err = checkOutputFilename(outdir, taskp.stdout) + if taskp.Stdout != "" { + err = checkOutputFilename(outdir, taskp.Stdout) if err != nil { - return "", "", err + return "", "", "", err } // Set up stdout redirection - stdout = outdir + "/" + taskp.stdout + stdout = outdir + "/" + taskp.Stdout cmd.Stdout, err = os.Create(stdout) if err != nil { - return "", "", err + return "", "", "", err } } else { cmd.Stdout = os.Stdout } - if taskp.env != nil { + if taskp.Stderr != "" { + err = checkOutputFilename(outdir, taskp.Stderr) + if err != nil { + return "", "", "", err + } + // Set up stderr redirection + stderr = outdir + "/" + taskp.Stderr + cmd.Stderr, err = os.Create(stderr) + if err != nil { + return "", "", "", err + } + } else { + cmd.Stderr = os.Stderr + } + + if taskp.Env != nil { // Set up subprocess environment cmd.Env = os.Environ() - for k, v := range taskp.env { + for k, v := range taskp.Env { v = substitute(v, replacements) cmd.Env = append(cmd.Env, k+"="+v) } } - return stdin, stdout, nil + return stdin, stdout, stderr, nil } +// Set up signal handlers. Go sends signal notifications to a "signal +// channel". func setupSignals(cmd *exec.Cmd) chan os.Signal { - // Set up signal handlers - // Forward SIGINT, SIGTERM and SIGQUIT to inner process sigChan := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - catch := <-sig - cmd.Process.Signal(catch) - }(sigChan) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGQUIT) @@ -165,45 +215,62 @@ func substitute(inp string, subst map[string]string) string { return inp } +func getKeepTmp(outdir string) (manifest string, err error) { + fn, err := os.Open(outdir + "/" + ".arvados#collection") + if err != nil { + return "", err + } + defer fn.Close() + + buf, err := ioutil.ReadAll(fn) + if err != nil { + return "", err + } + collection := arvados.Collection{} + err = json.Unmarshal(buf, &collection) + return collection.ManifestText, err +} + func runner(api IArvadosClient, kc IKeepClient, - jobUuid, taskUuid, crunchtmpdir, keepmount string, + jobUUID, taskUUID, crunchtmpdir, keepmount string, jobStruct Job, taskStruct Task) error { var err error - taskp := taskStruct.parameters + taskp := taskStruct.Parameters // If this is task 0 and there are multiple tasks, dispatch subtasks // and exit. - if taskStruct.sequence == 0 { - if len(jobStruct.script_parameters.tasks) == 1 { - taskp = jobStruct.script_parameters.tasks[0] + if taskStruct.Sequence == 0 { + if len(jobStruct.ScriptParameters.Tasks) == 1 { + taskp = jobStruct.ScriptParameters.Tasks[0] } else { - for _, task := range jobStruct.script_parameters.tasks { + for _, task := range jobStruct.ScriptParameters.Tasks { err := api.Create("job_tasks", map[string]interface{}{ - "job_task": Task{job_uuid: jobUuid, - created_by_job_task_uuid: taskUuid, - sequence: 1, - parameters: task}}, + "job_task": Task{ + JobUUID: jobUUID, + CreatedByJobTaskUUID: taskUUID, + Sequence: 1, + Parameters: task}}, nil) if err != nil { return TempFail{err} } } - err = api.Update("job_tasks", taskUuid, + err = api.Update("job_tasks", taskUUID, map[string]interface{}{ - "job_task": Task{ - output: "", - success: true, - progress: 1.0}}, + "job_task": map[string]interface{}{ + "output": "", + "success": true, + "progress": 1.0}}, nil) return nil } } var tmpdir, outdir string - tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid) + tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput) if err != nil { return TempFail{err} } @@ -213,17 +280,21 @@ func runner(api IArvadosClient, "$(task.outdir)": outdir, "$(task.keep)": keepmount} + log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir) + log.Printf("crunchrunner: $(task.outdir)=%v", outdir) + log.Printf("crunchrunner: $(task.keep)=%v", keepmount) + // Set up subprocess - for k, v := range taskp.command { - taskp.command[k] = substitute(v, replacements) + for k, v := range taskp.Command { + taskp.Command[k] = substitute(v, replacements) } - cmd := exec.Command(taskp.command[0], taskp.command[1:]...) + cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...) cmd.Dir = outdir - var stdin, stdout string - stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements) + var stdin, stdout, stderr string + stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements) if err != nil { return err } @@ -235,13 +306,39 @@ func runner(api IArvadosClient, if stdout != "" { stdout = " > " + stdout } - log.Printf("Running %v%v%v", cmd.Args, stdin, stdout) + if stderr != "" { + stderr = " 2> " + stderr + } + log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr) + + var caughtSignal os.Signal + sigChan := setupSignals(cmd) err = cmd.Start() + if err != nil { + signal.Stop(sigChan) + return TempFail{err} + } + + finishedSignalNotify := make(chan struct{}) + go func(sig <-chan os.Signal) { + for sig := range sig { + caughtSignal = sig + cmd.Process.Signal(caughtSignal) + } + close(finishedSignalNotify) + }(sigChan) - signals := setupSignals(cmd) err = cmd.Wait() - signal.Stop(signals) + signal.Stop(sigChan) + + close(sigChan) + <-finishedSignalNotify + + if caughtSignal != nil { + log.Printf("Caught signal %v", caughtSignal) + return PermFail{} + } if err != nil { // Run() returns ExitError on non-zero exit code, but we handle @@ -257,29 +354,34 @@ func runner(api IArvadosClient, log.Printf("Completed with exit code %v", exitCode) - if inCodes(exitCode, taskp.permanentFailCodes) { + if inCodes(exitCode, taskp.PermanentFailCodes) { success = false - } else if inCodes(exitCode, taskp.temporaryFailCodes) { + } else if inCodes(exitCode, taskp.TemporaryFailCodes) { return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)} - } else if inCodes(exitCode, taskp.successCodes) || cmd.ProcessState.Success() { + } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() { success = true } else { success = false } // Upload output directory - manifest, err := WriteTree(kc, outdir) + var manifest string + if taskp.KeepTmpOutput { + manifest, err = getKeepTmp(outdir) + } else { + manifest, err = WriteTree(kc, outdir) + } if err != nil { return TempFail{err} } // Set status - err = api.Update("job_tasks", taskUuid, + err = api.Update("job_tasks", taskUUID, map[string]interface{}{ "job_task": Task{ - output: manifest, - success: success, - progress: 1}}, + Output: manifest, + Success: success, + Progress: 1}}, nil) if err != nil { return TempFail{err} @@ -298,26 +400,31 @@ func main() { log.Fatal(err) } - jobUuid := os.Getenv("JOB_UUID") - taskUuid := os.Getenv("TASK_UUID") + jobUUID := os.Getenv("JOB_UUID") + taskUUID := os.Getenv("TASK_UUID") tmpdir := os.Getenv("TASK_WORK") keepmount := os.Getenv("TASK_KEEPMOUNT") var jobStruct Job var taskStruct Task - err = api.Get("jobs", jobUuid, nil, &jobStruct) + err = api.Get("jobs", jobUUID, nil, &jobStruct) if err != nil { log.Fatal(err) } - err = api.Get("job_tasks", taskUuid, nil, &taskStruct) + err = api.Get("job_tasks", taskUUID, nil, &taskStruct) if err != nil { log.Fatal(err) } var kc IKeepClient - kc, err = keepclient.MakeKeepClient(&api) - err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) + kc, err = keepclient.MakeKeepClient(api) + if err != nil { + log.Fatal(err) + } + + syscall.Umask(0022) + err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct) if err == nil { os.Exit(0)