X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f15a9a1ad40f589d3c40f856a95e1d5556ee7ca1..a12864a31d5569c74ed32157d5fe928a1c2563b7:/sdk/go/crunchrunner/crunchrunner.go diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index b8b6234f91..5d7e10be4b 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,122 +1,186 @@ package main import ( + "encoding/json" + "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - //"git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io" + "io/ioutil" "log" "os" "os/exec" "os/signal" + "strings" "syscall" ) type TaskDef struct { - commands []string `json:"commands"` - env map[string]string `json:"task.env"` - stdin string `json:"task.stdin"` - stdout string `json:"task.stdout"` - vwd map[string]string `json:"task.vwd"` - successCodes []int `json:"task.successCodes"` - permanentFailCodes []int `json:"task.permanentFailCodes"` - temporaryFailCodes []int `json:"task.temporaryFailCodes"` + Command []string `json:"command"` + Env map[string]string `json:"task.env"` + Stdin string `json:"task.stdin"` + Stdout string `json:"task.stdout"` + Stderr string `json:"task.stderr"` + Vwd map[string]string `json:"task.vwd"` + SuccessCodes []int `json:"task.successCodes"` + PermanentFailCodes []int `json:"task.permanentFailCodes"` + TemporaryFailCodes []int `json:"task.temporaryFailCodes"` + KeepTmpOutput bool `json:"task.keepTmpOutput"` } type Tasks struct { - tasks []TaskDef `json:"script_parameters"` + Tasks []TaskDef `json:"tasks"` } type Job struct { - script_parameters Tasks `json:"script_parameters"` + Script_parameters Tasks `json:"script_parameters"` } type Task struct { - job_uuid string `json:"job_uuid"` - created_by_job_task_uuid string `json:"created_by_job_task_uuid"` - parameters TaskDef `json:"parameters"` - sequence int `json:"sequence"` - output string `json:"output"` - success bool `json:"success"` - progress float32 `json:"sequence"` + Job_uuid string `json:"job_uuid"` + Created_by_job_task_uuid string `json:"created_by_job_task_uuid"` + Parameters TaskDef `json:"parameters"` + Sequence int `json:"sequence"` + Output string `json:"output"` + Success bool `json:"success"` + Progress float32 `json:"sequence"` } -func setupDirectories(tmpdir string) (outdir string, err error) { - err = os.Chdir(tmpdir) +type IArvadosClient interface { + Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error + Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) +} + +func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) { + tmpdir = crunchtmpdir + "/tmpdir" + err = os.Mkdir(tmpdir, 0700) if err != nil { - return "", err + return "", "", err } - err = os.Mkdir("tmpdir", 0700) - if err != nil { - return "", err + if keepTmp { + outdir = os.Getenv("TASK_KEEPMOUNT_TMP") + } else { + outdir = crunchtmpdir + "/outdir" + err = os.Mkdir(outdir, 0700) + if err != nil { + return "", "", err + } } - err = os.Mkdir("outdir", 0700) - if err != nil { - return "", err + return tmpdir, outdir, nil +} + +func checkOutputFilename(outdir, fn string) error { + if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") { + return fmt.Errorf("Path must not start or end with '/'") } + if strings.Index("../", fn) != -1 { + return fmt.Errorf("Path must not contain '../'") + } + + sl := strings.LastIndex(fn, "/") + if sl != -1 { + os.MkdirAll(outdir+"/"+fn[0:sl], 0777) + } + return nil +} - os.Chdir("outdir") +func copyFile(dst, src string) error { + in, err := os.Open(src) if err != nil { - return "", err + return err } + defer in.Close() - outdir, err = os.Getwd() + out, err := os.Create(dst) if err != nil { - return "", err + return err } + defer out.Close() - return outdir, nil + _, err = io.Copy(out, in) + return err } -func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error { - var err error - - //if taskp.vwd != nil { - // Set up VWD symlinks in outdir - // TODO - //} +func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) { + if taskp.Vwd != nil { + for k, v := range taskp.Vwd { + v = substitute(v, replacements) + err = checkOutputFilename(outdir, k) + if err != nil { + return "", "", "", err + } + if taskp.KeepTmpOutput { + err = copyFile(v, outdir+"/"+k) + } else { + err = os.Symlink(v, outdir+"/"+k) + } + if err != nil { + return "", "", "", err + } + } + } - if taskp.stdin != "" { + if taskp.Stdin != "" { // Set up stdin redirection - cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin) + stdin = substitute(taskp.Stdin, replacements) + cmd.Stdin, err = os.Open(stdin) if err != nil { - log.Fatal(err) + return "", "", "", err } } - if taskp.stdout != "" { + if taskp.Stdout != "" { + err = checkOutputFilename(outdir, taskp.Stdout) + if err != nil { + return "", "", "", err + } // Set up stdout redirection - cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout) + stdout = outdir + "/" + taskp.Stdout + cmd.Stdout, err = os.Create(stdout) if err != nil { - log.Fatal(err) + return "", "", "", err } } else { cmd.Stdout = os.Stdout } - if taskp.env != nil { + if taskp.Stderr != "" { + err = checkOutputFilename(outdir, taskp.Stderr) + if err != nil { + return "", "", "", err + } + // Set up stderr redirection + stderr = outdir + "/" + taskp.Stderr + cmd.Stderr, err = os.Create(stderr) + if err != nil { + return "", "", "", err + } + } else { + cmd.Stderr = os.Stderr + } + + if taskp.Env != nil { // Set up subprocess environment cmd.Env = os.Environ() - for k, v := range taskp.env { + for k, v := range taskp.Env { + v = substitute(v, replacements) cmd.Env = append(cmd.Env, k+"="+v) } } - return nil + return stdin, stdout, stderr, nil } -func setupSignals(cmd *exec.Cmd) { - // Set up signal handlers - // Forward SIGINT, SIGTERM and SIGQUIT to inner process +// Set up signal handlers. Go sends signal notifications to a "signal +// channel". +func setupSignals(cmd *exec.Cmd) chan os.Signal { sigChan := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - catch := <-sig - if cmd.Process != nil { - cmd.Process.Signal(catch) - } - }(sigChan) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGQUIT) + return sigChan } func inCodes(code int, codes []int) bool { @@ -132,37 +196,57 @@ func inCodes(code int, codes []int) bool { const TASK_TEMPFAIL = 111 -type TempFail struct{ InnerError error } +type TempFail struct{ error } type PermFail struct{} -func (s TempFail) Error() string { - return s.InnerError.Error() -} - func (s PermFail) Error() string { return "PermFail" } -func runner(api arvadosclient.IArvadosClient, - jobUuid, taskUuid, tmpdir, keepmount string, +func substitute(inp string, subst map[string]string) string { + for k, v := range subst { + inp = strings.Replace(inp, k, v, -1) + } + return inp +} + +func getKeepTmp(outdir string) (manifest string, err error) { + fn, err := os.Open(outdir + "/" + ".arvados#collection") + if err != nil { + return "", err + } + defer fn.Close() + + buf, err := ioutil.ReadAll(fn) + if err != nil { + return "", err + } + collection := arvados.Collection{} + err = json.Unmarshal(buf, &collection) + return collection.ManifestText, err +} + +func runner(api IArvadosClient, + kc IKeepClient, + jobUuid, taskUuid, crunchtmpdir, keepmount string, jobStruct Job, taskStruct Task) error { var err error - taskp := taskStruct.parameters + taskp := taskStruct.Parameters // If this is task 0 and there are multiple tasks, dispatch subtasks // and exit. - if taskStruct.sequence == 0 { - if len(jobStruct.script_parameters.tasks) == 1 { - taskp = jobStruct.script_parameters.tasks[0] + if taskStruct.Sequence == 0 { + if len(jobStruct.Script_parameters.Tasks) == 1 { + taskp = jobStruct.Script_parameters.Tasks[0] } else { - for _, task := range jobStruct.script_parameters.tasks { + for _, task := range jobStruct.Script_parameters.Tasks { err := api.Create("job_tasks", map[string]interface{}{ - "job_task": Task{job_uuid: jobUuid, - created_by_job_task_uuid: taskUuid, - sequence: 1, - parameters: task}}, + "job_task": Task{Job_uuid: jobUuid, + Created_by_job_task_uuid: taskUuid, + Sequence: 1, + Parameters: task}}, nil) if err != nil { return TempFail{err} @@ -171,76 +255,133 @@ func runner(api arvadosclient.IArvadosClient, err = api.Update("job_tasks", taskUuid, map[string]interface{}{ "job_task": Task{ - output: "", - success: true, - progress: 1.0}}, + Output: "", + Success: true, + Progress: 1.0}}, nil) return nil } } - // Set up subprocess - cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...) - - var outdir string - outdir, err = setupDirectories(tmpdir) + var tmpdir, outdir string + tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput) if err != nil { return TempFail{err} } + replacements := map[string]string{ + "$(task.tmpdir)": tmpdir, + "$(task.outdir)": outdir, + "$(task.keep)": keepmount} + + log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir) + log.Printf("crunchrunner: $(task.outdir)=%v", outdir) + log.Printf("crunchrunner: $(task.keep)=%v", keepmount) + + // Set up subprocess + for k, v := range taskp.Command { + taskp.Command[k] = substitute(v, replacements) + } + + cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...) + cmd.Dir = outdir - err = setupCommand(cmd, taskp, keepmount, outdir) + var stdin, stdout, stderr string + stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements) if err != nil { return err } - setupSignals(cmd) - // Run subprocess and wait for it to complete - log.Printf("Running %v", cmd.Args) + if stdin != "" { + stdin = " < " + stdin + } + if stdout != "" { + stdout = " > " + stdout + } + if stderr != "" { + stderr = " 2> " + stderr + } + log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr) - err = cmd.Run() + var caughtSignal os.Signal + sigChan := setupSignals(cmd) + err = cmd.Start() if err != nil { + signal.Stop(sigChan) return TempFail{err} } - const success = 1 - const permfail = 2 - const tempfail = 2 - var status int + finishedSignalNotify := make(chan struct{}) + go func(sig <-chan os.Signal) { + for sig := range sig { + caughtSignal = sig + cmd.Process.Signal(caughtSignal) + } + close(finishedSignalNotify) + }(sigChan) + + err = cmd.Wait() + signal.Stop(sigChan) + + close(sigChan) + <-finishedSignalNotify + + if caughtSignal != nil { + log.Printf("Caught signal %v", caughtSignal) + return PermFail{} + } + + if err != nil { + // Run() returns ExitError on non-zero exit code, but we handle + // that down below. So only return if it's not ExitError. + if _, ok := err.(*exec.ExitError); !ok { + return TempFail{err} + } + } + + var success bool exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() - if inCodes(exitCode, taskp.successCodes) { - status = success - } else if inCodes(exitCode, taskp.permanentFailCodes) { - status = permfail - } else if inCodes(exitCode, taskp.temporaryFailCodes) { - os.Exit(TASK_TEMPFAIL) - } else if cmd.ProcessState.Success() { - status = success + log.Printf("Completed with exit code %v", exitCode) + + if inCodes(exitCode, taskp.PermanentFailCodes) { + success = false + } else if inCodes(exitCode, taskp.TemporaryFailCodes) { + return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)} + } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() { + success = true } else { - status = permfail + success = false } // Upload output directory - // TODO + var manifest string + if taskp.KeepTmpOutput { + manifest, err = getKeepTmp(outdir) + } else { + manifest, err = WriteTree(kc, outdir) + } + if err != nil { + return TempFail{err} + } // Set status err = api.Update("job_tasks", taskUuid, map[string]interface{}{ - "job_task": map[string]interface{}{ - "output": "", - "success": status == success, - "progress": 1.0}}, + "job_task": Task{ + Output: manifest, + Success: success, + Progress: 1}}, nil) if err != nil { return TempFail{err} } - if status == success { + if success { return nil } else { return PermFail{} @@ -248,8 +389,6 @@ func runner(api arvadosclient.IArvadosClient, } func main() { - syscall.Umask(0077) - api, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatal(err) @@ -272,7 +411,14 @@ func main() { log.Fatal(err) } - err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) + var kc IKeepClient + kc, err = keepclient.MakeKeepClient(api) + if err != nil { + log.Fatal(err) + } + + syscall.Umask(0022) + err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) if err == nil { os.Exit(0)