X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4c2855c80c11b60be9e5c6af384a0a90fb66ae16..224f384d411bb1b4cccc7165c55bb64fd5c695ad:/sdk/go/crunchrunner/crunchrunner.go?ds=sidebyside diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index 9f2ae2501c..5c3d65c561 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,134 +1,162 @@ package main import ( + "crypto/x509" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - //"git.curoverse.com/arvados.git/sdk/go/keepclient" - "errors" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" "log" + "net/http" "os" "os/exec" "os/signal" + "strings" "syscall" ) -func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) { - r = make(arvadosclient.Dict) - err := api.Get(rsc, uuid, nil, &r) - if err != nil { - log.Fatal(err) - } - return r +type TaskDef struct { + Command []string `json:"command"` + Env map[string]string `json:"task.env"` + Stdin string `json:"task.stdin"` + Stdout string `json:"task.stdout"` + Stderr string `json:"task.stderr"` + Vwd map[string]string `json:"task.vwd"` + SuccessCodes []int `json:"task.successCodes"` + PermanentFailCodes []int `json:"task.permanentFailCodes"` + TemporaryFailCodes []int `json:"task.temporaryFailCodes"` } -func setupDirectories(tmpdir string) (outdir string, err error) { - err = os.Chdir(tmpdir) - if err != nil { - return "", err - } +type Tasks struct { + Tasks []TaskDef `json:"tasks"` +} - err = os.Mkdir("tmpdir", 0700) - if err != nil { - return "", err - } +type Job struct { + Script_parameters Tasks `json:"script_parameters"` +} - err = os.Mkdir("outdir", 0700) - if err != nil { - return "", err - } +type Task struct { + Job_uuid string `json:"job_uuid"` + Created_by_job_task_uuid string `json:"created_by_job_task_uuid"` + Parameters TaskDef `json:"parameters"` + Sequence int `json:"sequence"` + Output string `json:"output"` + Success bool `json:"success"` + Progress float32 `json:"sequence"` +} - os.Chdir("outdir") +type IArvadosClient interface { + Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error + Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) +} + +func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) { + tmpdir = crunchtmpdir + "/tmpdir" + err = os.Mkdir(tmpdir, 0700) if err != nil { - return "", err + return "", "", err } - outdir, err = os.Getwd() + outdir = crunchtmpdir + "/outdir" + err = os.Mkdir(outdir, 0700) if err != nil { - return "", err + return "", "", err } - return outdir, nil + return tmpdir, outdir, nil } -func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error { - var err error +func checkOutputFilename(outdir, fn string) error { + if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") { + return fmt.Errorf("Path must not start or end with '/'") + } + if strings.Index("../", fn) != -1 { + return fmt.Errorf("Path must not contain '../'") + } - if taskp["task.vwd"] != nil { - // Set up VWD symlinks in outdir - // TODO + sl := strings.LastIndex(fn, "/") + if sl != -1 { + os.MkdirAll(outdir+"/"+fn[0:sl], 0777) } + return nil +} - if taskp["task.stdin"] != nil { - stdin, ok := taskp["task.stdin"].(string) - if !ok { - return errors.New("Could not cast task.stdin to string") +func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) { + if taskp.Vwd != nil { + for k, v := range taskp.Vwd { + v = substitute(v, replacements) + err = checkOutputFilename(outdir, k) + if err != nil { + return "", "", "", err + } + os.Symlink(v, outdir+"/"+k) } + } + + if taskp.Stdin != "" { // Set up stdin redirection - cmd.Stdin, err = os.Open(keepmount + "/" + stdin) + stdin = substitute(taskp.Stdin, replacements) + cmd.Stdin, err = os.Open(stdin) if err != nil { - log.Fatal(err) + return "", "", "", err } } - if taskp["task.stdout"] != nil { - stdout, ok := taskp["task.stdout"].(string) - if !ok { - return errors.New("Could not cast task.stdout to string") + if taskp.Stdout != "" { + err = checkOutputFilename(outdir, taskp.Stdout) + if err != nil { + return "", "", "", err } - // Set up stdout redirection - cmd.Stdout, err = os.Open(outdir + "/" + stdout) + stdout = outdir + "/" + taskp.Stdout + cmd.Stdout, err = os.Create(stdout) if err != nil { - log.Fatal(err) + return "", "", "", err } } else { cmd.Stdout = os.Stdout } - if taskp["task.env"] != nil { - taskenv, ok := taskp["task.env"].(map[string]interface{}) - if !ok { - return errors.New("Could not cast task.env to map") + if taskp.Stderr != "" { + err = checkOutputFilename(outdir, taskp.Stderr) + if err != nil { + return "", "", "", err + } + // Set up stderr redirection + stderr = outdir + "/" + taskp.Stderr + cmd.Stderr, err = os.Create(stderr) + if err != nil { + return "", "", "", err } + } else { + cmd.Stderr = os.Stderr + } + if taskp.Env != nil { // Set up subprocess environment cmd.Env = os.Environ() - for k, v := range taskenv { - var vstr string - vstr, ok = v.(string) - if !ok { - return errors.New("Could not cast environment value to string") - } - cmd.Env = append(cmd.Env, k+"="+vstr) + for k, v := range taskp.Env { + v = substitute(v, replacements) + cmd.Env = append(cmd.Env, k+"="+v) } } - return nil + return stdin, stdout, stderr, nil } -func setupSignals(cmd *exec.Cmd) { - // Set up signal handlers - // Forward SIGINT, SIGTERM and SIGQUIT to inner process +// Set up signal handlers. Go sends signal notifications to a "signal +// channel". +func setupSignals(cmd *exec.Cmd) chan os.Signal { sigChan := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - catch := <-sig - if cmd.Process != nil { - cmd.Process.Signal(catch) - } - }(sigChan) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGQUIT) + return sigChan } -func inCodes(code int, codes interface{}) bool { +func inCodes(code int, codes []int) bool { if codes != nil { - codesArray, ok := codes.([]interface{}) - if !ok { - return false - } - for _, c := range codesArray { - var num float64 - num, ok = c.(float64) - if ok && code == int(num) { + for _, c := range codes { + if code == c { return true } } @@ -138,147 +166,171 @@ func inCodes(code int, codes interface{}) bool { const TASK_TEMPFAIL = 111 -type TempFail struct{ InnerError error } +type TempFail struct{ error } type PermFail struct{} -func (s TempFail) Error() string { - return s.InnerError.Error() -} - func (s PermFail) Error() string { return "PermFail" } -func runner(api arvadosclient.ArvadosClient, - jobUuid, taskUuid, tmpdir, keepmount string, jobStruct, - taskStruct arvadosclient.Dict) error { - - var err error - var ok bool - var jobp, taskp map[string]interface{} - jobp, ok = jobStruct["script_parameters"].(map[string]interface{}) - if !ok { - return errors.New("Could not cast job script_parameters to map") +func substitute(inp string, subst map[string]string) string { + for k, v := range subst { + inp = strings.Replace(inp, k, v, -1) } + return inp +} - taskp, ok = taskStruct["parameters"].(map[string]interface{}) - if !ok { - return errors.New("Could not cast task parameters to map") - } +func runner(api IArvadosClient, + kc IKeepClient, + jobUuid, taskUuid, crunchtmpdir, keepmount string, + jobStruct Job, taskStruct Task) error { + + var err error + taskp := taskStruct.Parameters // If this is task 0 and there are multiple tasks, dispatch subtasks // and exit. - if taskStruct["sequence"] == 0.0 { - var tasks []interface{} - tasks, ok = jobp["tasks"].([]interface{}) - if !ok { - return errors.New("Could not cast tasks to array") - } - - if len(tasks) == 1 { - taskp = tasks[0].(map[string]interface{}) + if taskStruct.Sequence == 0 { + if len(jobStruct.Script_parameters.Tasks) == 1 { + taskp = jobStruct.Script_parameters.Tasks[0] } else { - for task := range tasks { - err := api.Call("POST", "job_tasks", "", "", - arvadosclient.Dict{ - "job_uuid": jobUuid, - "created_by_job_task_uuid": "", - "sequence": 1, - "parameters": task}, + for _, task := range jobStruct.Script_parameters.Tasks { + err := api.Create("job_tasks", + map[string]interface{}{ + "job_task": Task{Job_uuid: jobUuid, + Created_by_job_task_uuid: taskUuid, + Sequence: 1, + Parameters: task}}, nil) if err != nil { return TempFail{err} } } - err = api.Call("PUT", "job_tasks", taskUuid, "", - arvadosclient.Dict{ - "job_task": arvadosclient.Dict{ - "output": "", - "success": true, - "progress": 1.0}}, + err = api.Update("job_tasks", taskUuid, + map[string]interface{}{ + "job_task": Task{ + Output: "", + Success: true, + Progress: 1.0}}, nil) return nil } } - // Set up subprocess - var commandline []string - var commandsarray []interface{} - - commandsarray, ok = taskp["command"].([]interface{}) - if !ok { - return errors.New("Could not cast commands to array") + var tmpdir, outdir string + tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid) + if err != nil { + return TempFail{err} } - for _, c := range commandsarray { - var cstr string - cstr, ok = c.(string) - if !ok { - return errors.New("Could not cast command argument to string") - } - commandline = append(commandline, cstr) - } - cmd := exec.Command(commandline[0], commandline[1:]...) + replacements := map[string]string{ + "$(task.tmpdir)": tmpdir, + "$(task.outdir)": outdir, + "$(task.keep)": keepmount} - var outdir string - outdir, err = setupDirectories(tmpdir) - if err != nil { - return TempFail{err} + log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir) + log.Printf("crunchrunner: $(task.outdir)=%v", outdir) + log.Printf("crunchrunner: $(task.keep)=%v", keepmount) + + // Set up subprocess + for k, v := range taskp.Command { + taskp.Command[k] = substitute(v, replacements) } + cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...) + cmd.Dir = outdir - err = setupCommand(cmd, taskp, keepmount, outdir) + var stdin, stdout, stderr string + stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements) if err != nil { return err } - setupSignals(cmd) - // Run subprocess and wait for it to complete - log.Printf("Running %v", cmd.Args) + if stdin != "" { + stdin = " < " + stdin + } + if stdout != "" { + stdout = " > " + stdout + } + if stderr != "" { + stderr = " 2> " + stderr + } + log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr) - err = cmd.Run() + var caughtSignal os.Signal + sigChan := setupSignals(cmd) + err = cmd.Start() if err != nil { + signal.Stop(sigChan) return TempFail{err} } - const success = 1 - const permfail = 2 - const tempfail = 2 - var status int + finishedSignalNotify := make(chan struct{}) + go func(sig <-chan os.Signal) { + for sig := range sig { + caughtSignal = sig + cmd.Process.Signal(caughtSignal) + } + close(finishedSignalNotify) + }(sigChan) + + err = cmd.Wait() + signal.Stop(sigChan) + + close(sigChan) + <-finishedSignalNotify + + if caughtSignal != nil { + log.Printf("Caught signal %v", caughtSignal) + return PermFail{} + } + + if err != nil { + // Run() returns ExitError on non-zero exit code, but we handle + // that down below. So only return if it's not ExitError. + if _, ok := err.(*exec.ExitError); !ok { + return TempFail{err} + } + } + + var success bool exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() - if inCodes(exitCode, taskp["task.successCodes"]) { - status = success - } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) { - status = permfail - } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) { - os.Exit(TASK_TEMPFAIL) - } else if cmd.ProcessState.Success() { - status = success + log.Printf("Completed with exit code %v", exitCode) + + if inCodes(exitCode, taskp.PermanentFailCodes) { + success = false + } else if inCodes(exitCode, taskp.TemporaryFailCodes) { + return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)} + } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() { + success = true } else { - status = permfail + success = false } // Upload output directory - // TODO + manifest, err := WriteTree(kc, outdir) + if err != nil { + return TempFail{err} + } // Set status - err = api.Call("PUT", "job_tasks", taskUuid, "", - arvadosclient.Dict{ - "job_task": arvadosclient.Dict{ - "output": "", - "success": status == success, - "progress": 1.0}}, + err = api.Update("job_tasks", taskUuid, + map[string]interface{}{ + "job_task": Task{ + Output: manifest, + Success: success, + Progress: 1}}, nil) if err != nil { return TempFail{err} } - if status == success { + if success { return nil } else { return PermFail{} @@ -286,22 +338,54 @@ func runner(api arvadosclient.ArvadosClient, } func main() { - syscall.Umask(0077) - api, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatal(err) } + // Container may not have certificates installed, so need to look for + // /etc/arvados/ca-certificates.crt in addition to normal system certs. + var certFiles = []string{ + "/etc/ssl/certs/ca-certificates.crt", // Debian + "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat + "/etc/arvados/ca-certificates.crt", + } + + certs := x509.NewCertPool() + for _, file := range certFiles { + data, err := ioutil.ReadFile(file) + if err == nil { + log.Printf("Using TLS certificates at %v", file) + certs.AppendCertsFromPEM(data) + } + } + api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs + jobUuid := os.Getenv("JOB_UUID") taskUuid := os.Getenv("TASK_UUID") tmpdir := os.Getenv("TASK_WORK") keepmount := os.Getenv("TASK_KEEPMOUNT") - jobStruct := getRecord(api, "jobs", jobUuid) - taskStruct := getRecord(api, "job_tasks", taskUuid) + var jobStruct Job + var taskStruct Task + + err = api.Get("jobs", jobUuid, nil, &jobStruct) + if err != nil { + log.Fatal(err) + } + err = api.Get("job_tasks", taskUuid, nil, &taskStruct) + if err != nil { + log.Fatal(err) + } + + var kc IKeepClient + kc, err = keepclient.MakeKeepClient(api) + if err != nil { + log.Fatal(err) + } - err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) + syscall.Umask(0022) + err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) if err == nil { os.Exit(0)