X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5a4cb9d3957905a02716761edb1be662edad0312..fd3e91e6cd737554b4ae491a558e52f41bad3d07:/sdk/go/crunchrunner/crunchrunner.go diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index 0ca7ce9e9b..5c3d65c561 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,123 +1,156 @@ package main import ( + "crypto/x509" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - //"git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" "log" + "net/http" "os" "os/exec" "os/signal" + "strings" "syscall" ) type TaskDef struct { - commands []string `json:"commands"` - env map[string]string `json:"env"` - stdin string `json:"stdin"` - stdout string `json:"stdout"` - vwd map[string]string `json:"vwd"` - successCodes []int `json:"successCodes"` - permanentFailCodes []int `json:"permanentFailCodes"` - temporaryFailCodes []int `json:"temporaryFailCodes"` + Command []string `json:"command"` + Env map[string]string `json:"task.env"` + Stdin string `json:"task.stdin"` + Stdout string `json:"task.stdout"` + Stderr string `json:"task.stderr"` + Vwd map[string]string `json:"task.vwd"` + SuccessCodes []int `json:"task.successCodes"` + PermanentFailCodes []int `json:"task.permanentFailCodes"` + TemporaryFailCodes []int `json:"task.temporaryFailCodes"` } type Tasks struct { - tasks []TaskDef `json:"script_parameters"` + Tasks []TaskDef `json:"tasks"` } type Job struct { - script_parameters Tasks `json:"script_parameters"` + Script_parameters Tasks `json:"script_parameters"` } type Task struct { - job_uuid string `json:"job_uuid"` - created_by_job_task_uuid string `json:"created_by_job_task_uuid"` - parameters TaskDef `json:"parameters"` - sequence int `json:"sequence"` - output string `json:"output"` - success bool `json:"success"` - progress float32 `json:"sequence"` + Job_uuid string `json:"job_uuid"` + Created_by_job_task_uuid string `json:"created_by_job_task_uuid"` + Parameters TaskDef `json:"parameters"` + Sequence int `json:"sequence"` + Output string `json:"output"` + Success bool `json:"success"` + Progress float32 `json:"sequence"` } -func setupDirectories(tmpdir, taskUuid string) (outdir string, err error) { - err = os.Chdir(tmpdir) - if err != nil { - return "", err - } +type IArvadosClient interface { + Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error + Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) +} - err = os.Mkdir("tmpdir", 0700) +func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) { + tmpdir = crunchtmpdir + "/tmpdir" + err = os.Mkdir(tmpdir, 0700) if err != nil { - return "", err + return "", "", err } - err = os.Mkdir(taskUuid, 0700) + outdir = crunchtmpdir + "/outdir" + err = os.Mkdir(outdir, 0700) if err != nil { - return "", err + return "", "", err } - os.Chdir(taskUuid) - if err != nil { - return "", err - } + return tmpdir, outdir, nil +} - outdir, err = os.Getwd() - if err != nil { - return "", err +func checkOutputFilename(outdir, fn string) error { + if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") { + return fmt.Errorf("Path must not start or end with '/'") + } + if strings.Index("../", fn) != -1 { + return fmt.Errorf("Path must not contain '../'") } - return outdir, nil + sl := strings.LastIndex(fn, "/") + if sl != -1 { + os.MkdirAll(outdir+"/"+fn[0:sl], 0777) + } + return nil } -func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error { - var err error - - if taskp.vwd != nil { - for k, v := range taskp.vwd { - os.Symlink(keepmount+"/"+v, outdir+"/"+k) +func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) { + if taskp.Vwd != nil { + for k, v := range taskp.Vwd { + v = substitute(v, replacements) + err = checkOutputFilename(outdir, k) + if err != nil { + return "", "", "", err + } + os.Symlink(v, outdir+"/"+k) } } - if taskp.stdin != "" { + if taskp.Stdin != "" { // Set up stdin redirection - cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin) + stdin = substitute(taskp.Stdin, replacements) + cmd.Stdin, err = os.Open(stdin) if err != nil { - return err + return "", "", "", err } } - if taskp.stdout != "" { + if taskp.Stdout != "" { + err = checkOutputFilename(outdir, taskp.Stdout) + if err != nil { + return "", "", "", err + } // Set up stdout redirection - cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout) + stdout = outdir + "/" + taskp.Stdout + cmd.Stdout, err = os.Create(stdout) if err != nil { - return err + return "", "", "", err } } else { cmd.Stdout = os.Stdout } - if taskp.env != nil { + if taskp.Stderr != "" { + err = checkOutputFilename(outdir, taskp.Stderr) + if err != nil { + return "", "", "", err + } + // Set up stderr redirection + stderr = outdir + "/" + taskp.Stderr + cmd.Stderr, err = os.Create(stderr) + if err != nil { + return "", "", "", err + } + } else { + cmd.Stderr = os.Stderr + } + + if taskp.Env != nil { // Set up subprocess environment cmd.Env = os.Environ() - for k, v := range taskp.env { + for k, v := range taskp.Env { + v = substitute(v, replacements) cmd.Env = append(cmd.Env, k+"="+v) } } - return nil + return stdin, stdout, stderr, nil } -func setupSignals(cmd *exec.Cmd) { - // Set up signal handlers - // Forward SIGINT, SIGTERM and SIGQUIT to inner process +// Set up signal handlers. Go sends signal notifications to a "signal +// channel". +func setupSignals(cmd *exec.Cmd) chan os.Signal { sigChan := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - catch := <-sig - if cmd.Process != nil { - cmd.Process.Signal(catch) - } - }(sigChan) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGQUIT) + return sigChan } func inCodes(code int, codes []int) bool { @@ -133,37 +166,41 @@ func inCodes(code int, codes []int) bool { const TASK_TEMPFAIL = 111 -type TempFail struct{ InnerError error } +type TempFail struct{ error } type PermFail struct{} -func (s TempFail) Error() string { - return s.InnerError.Error() -} - func (s PermFail) Error() string { return "PermFail" } -func runner(api arvadosclient.IArvadosClient, - jobUuid, taskUuid, tmpdir, keepmount string, +func substitute(inp string, subst map[string]string) string { + for k, v := range subst { + inp = strings.Replace(inp, k, v, -1) + } + return inp +} + +func runner(api IArvadosClient, + kc IKeepClient, + jobUuid, taskUuid, crunchtmpdir, keepmount string, jobStruct Job, taskStruct Task) error { var err error - taskp := taskStruct.parameters + taskp := taskStruct.Parameters // If this is task 0 and there are multiple tasks, dispatch subtasks // and exit. - if taskStruct.sequence == 0 { - if len(jobStruct.script_parameters.tasks) == 1 { - taskp = jobStruct.script_parameters.tasks[0] + if taskStruct.Sequence == 0 { + if len(jobStruct.Script_parameters.Tasks) == 1 { + taskp = jobStruct.Script_parameters.Tasks[0] } else { - for _, task := range jobStruct.script_parameters.tasks { + for _, task := range jobStruct.Script_parameters.Tasks { err := api.Create("job_tasks", map[string]interface{}{ - "job_task": Task{job_uuid: jobUuid, - created_by_job_task_uuid: taskUuid, - sequence: 1, - parameters: task}}, + "job_task": Task{Job_uuid: jobUuid, + Created_by_job_task_uuid: taskUuid, + Sequence: 1, + Parameters: task}}, nil) if err != nil { return TempFail{err} @@ -172,36 +209,84 @@ func runner(api arvadosclient.IArvadosClient, err = api.Update("job_tasks", taskUuid, map[string]interface{}{ "job_task": Task{ - output: "", - success: true, - progress: 1.0}}, + Output: "", + Success: true, + Progress: 1.0}}, nil) return nil } } - // Set up subprocess - cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...) - - var outdir string - outdir, err = setupDirectories(tmpdir, taskUuid) + var tmpdir, outdir string + tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid) if err != nil { return TempFail{err} } + replacements := map[string]string{ + "$(task.tmpdir)": tmpdir, + "$(task.outdir)": outdir, + "$(task.keep)": keepmount} + + log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir) + log.Printf("crunchrunner: $(task.outdir)=%v", outdir) + log.Printf("crunchrunner: $(task.keep)=%v", keepmount) + + // Set up subprocess + for k, v := range taskp.Command { + taskp.Command[k] = substitute(v, replacements) + } + + cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...) + cmd.Dir = outdir - err = setupCommand(cmd, taskp, keepmount, outdir) + var stdin, stdout, stderr string + stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements) if err != nil { return err } - setupSignals(cmd) - // Run subprocess and wait for it to complete - log.Printf("Running %v", cmd.Args) + if stdin != "" { + stdin = " < " + stdin + } + if stdout != "" { + stdout = " > " + stdout + } + if stderr != "" { + stderr = " 2> " + stderr + } + log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr) - err = cmd.Run() + var caughtSignal os.Signal + sigChan := setupSignals(cmd) + + err = cmd.Start() + if err != nil { + signal.Stop(sigChan) + return TempFail{err} + } + + finishedSignalNotify := make(chan struct{}) + go func(sig <-chan os.Signal) { + for sig := range sig { + caughtSignal = sig + cmd.Process.Signal(caughtSignal) + } + close(finishedSignalNotify) + }(sigChan) + + err = cmd.Wait() + signal.Stop(sigChan) + + close(sigChan) + <-finishedSignalNotify + + if caughtSignal != nil { + log.Printf("Caught signal %v", caughtSignal) + return PermFail{} + } if err != nil { // Run() returns ExitError on non-zero exit code, but we handle @@ -211,41 +296,41 @@ func runner(api arvadosclient.IArvadosClient, } } - const success = 1 - const permfail = 2 - const tempfail = 2 - var status int + var success bool exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() - if inCodes(exitCode, taskp.successCodes) { - status = success - } else if inCodes(exitCode, taskp.permanentFailCodes) { - status = permfail - } else if inCodes(exitCode, taskp.temporaryFailCodes) { - return TempFail{nil} - } else if cmd.ProcessState.Success() { - status = success + log.Printf("Completed with exit code %v", exitCode) + + if inCodes(exitCode, taskp.PermanentFailCodes) { + success = false + } else if inCodes(exitCode, taskp.TemporaryFailCodes) { + return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)} + } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() { + success = true } else { - status = permfail + success = false } // Upload output directory - // TODO + manifest, err := WriteTree(kc, outdir) + if err != nil { + return TempFail{err} + } // Set status err = api.Update("job_tasks", taskUuid, map[string]interface{}{ - "job_task": map[string]interface{}{ - "output": "", - "success": status == success, - "progress": 1.0}}, + "job_task": Task{ + Output: manifest, + Success: success, + Progress: 1}}, nil) if err != nil { return TempFail{err} } - if status == success { + if success { return nil } else { return PermFail{} @@ -253,13 +338,29 @@ func runner(api arvadosclient.IArvadosClient, } func main() { - syscall.Umask(0077) - api, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatal(err) } + // Container may not have certificates installed, so need to look for + // /etc/arvados/ca-certificates.crt in addition to normal system certs. + var certFiles = []string{ + "/etc/ssl/certs/ca-certificates.crt", // Debian + "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat + "/etc/arvados/ca-certificates.crt", + } + + certs := x509.NewCertPool() + for _, file := range certFiles { + data, err := ioutil.ReadFile(file) + if err == nil { + log.Printf("Using TLS certificates at %v", file) + certs.AppendCertsFromPEM(data) + } + } + api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs + jobUuid := os.Getenv("JOB_UUID") taskUuid := os.Getenv("TASK_UUID") tmpdir := os.Getenv("TASK_WORK") @@ -277,7 +378,14 @@ func main() { log.Fatal(err) } - err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) + var kc IKeepClient + kc, err = keepclient.MakeKeepClient(api) + if err != nil { + log.Fatal(err) + } + + syscall.Umask(0022) + err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) if err == nil { os.Exit(0)