X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/53b1b5bbe6d6ae007e8ce546ba2539e0c061e25a..3dd2a1957ae4106bfc2bd5405662c47c087eb79c:/sdk/go/crunchrunner/crunchrunner.go diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index d3f6fcbb43..14c75afff2 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,10 +1,13 @@ package main import ( + "crypto/x509" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" "log" + "net/http" "os" "os/exec" "os/signal" @@ -13,32 +16,32 @@ import ( ) type TaskDef struct { - command []string `json:"command"` - env map[string]string `json:"task.env"` - stdin string `json:"task.stdin"` - stdout string `json:"task.stdout"` - vwd map[string]string `json:"task.vwd"` - successCodes []int `json:"task.successCodes"` - permanentFailCodes []int `json:"task.permanentFailCodes"` - temporaryFailCodes []int `json:"task.temporaryFailCodes"` + Command []string `json:"command"` + Env map[string]string `json:"task.env"` + Stdin string `json:"task.stdin"` + Stdout string `json:"task.stdout"` + Vwd map[string]string `json:"task.vwd"` + SuccessCodes []int `json:"task.successCodes"` + PermanentFailCodes []int `json:"task.permanentFailCodes"` + TemporaryFailCodes []int `json:"task.temporaryFailCodes"` } type Tasks struct { - tasks []TaskDef `json:"script_parameters"` + Tasks []TaskDef `json:"tasks"` } type Job struct { - script_parameters Tasks `json:"script_parameters"` + Script_parameters Tasks `json:"script_parameters"` } type Task struct { - job_uuid string `json:"job_uuid"` - created_by_job_task_uuid string `json:"created_by_job_task_uuid"` - parameters TaskDef `json:"parameters"` - sequence int `json:"sequence"` - output string `json:"output"` - success bool `json:"success"` - progress float32 `json:"sequence"` + Job_uuid string `json:"job_uuid"` + Created_by_job_task_uuid string `json:"created_by_job_task_uuid"` + Parameters TaskDef `json:"parameters"` + Sequence int `json:"sequence"` + Output string `json:"output"` + Success bool `json:"success"` + Progress float32 `json:"sequence"` } type IArvadosClient interface { @@ -78,8 +81,8 @@ func checkOutputFilename(outdir, fn string) error { } func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) { - if taskp.vwd != nil { - for k, v := range taskp.vwd { + if taskp.Vwd != nil { + for k, v := range taskp.Vwd { v = substitute(v, replacements) err = checkOutputFilename(outdir, k) if err != nil { @@ -89,22 +92,22 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[ } } - if taskp.stdin != "" { + if taskp.Stdin != "" { // Set up stdin redirection - stdin = substitute(taskp.stdin, replacements) + stdin = substitute(taskp.Stdin, replacements) cmd.Stdin, err = os.Open(stdin) if err != nil { return "", "", err } } - if taskp.stdout != "" { - err = checkOutputFilename(outdir, taskp.stdout) + if taskp.Stdout != "" { + err = checkOutputFilename(outdir, taskp.Stdout) if err != nil { return "", "", err } // Set up stdout redirection - stdout = outdir + "/" + taskp.stdout + stdout = outdir + "/" + taskp.Stdout cmd.Stdout, err = os.Create(stdout) if err != nil { return "", "", err @@ -113,10 +116,12 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[ cmd.Stdout = os.Stdout } - if taskp.env != nil { + cmd.Stderr = os.Stderr + + if taskp.Env != nil { // Set up subprocess environment cmd.Env = os.Environ() - for k, v := range taskp.env { + for k, v := range taskp.Env { v = substitute(v, replacements) cmd.Env = append(cmd.Env, k+"="+v) } @@ -124,14 +129,10 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[ return stdin, stdout, nil } +// Set up signal handlers. Go sends signal notifications to a "signal +// channel". func setupSignals(cmd *exec.Cmd) chan os.Signal { - // Set up signal handlers - // Forward SIGINT, SIGTERM and SIGQUIT to inner process sigChan := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - catch := <-sig - cmd.Process.Signal(catch) - }(sigChan) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGQUIT) @@ -171,21 +172,21 @@ func runner(api IArvadosClient, jobStruct Job, taskStruct Task) error { var err error - taskp := taskStruct.parameters + taskp := taskStruct.Parameters // If this is task 0 and there are multiple tasks, dispatch subtasks // and exit. - if taskStruct.sequence == 0 { - if len(jobStruct.script_parameters.tasks) == 1 { - taskp = jobStruct.script_parameters.tasks[0] + if taskStruct.Sequence == 0 { + if len(jobStruct.Script_parameters.Tasks) == 1 { + taskp = jobStruct.Script_parameters.Tasks[0] } else { - for _, task := range jobStruct.script_parameters.tasks { + for _, task := range jobStruct.Script_parameters.Tasks { err := api.Create("job_tasks", map[string]interface{}{ - "job_task": Task{job_uuid: jobUuid, - created_by_job_task_uuid: taskUuid, - sequence: 1, - parameters: task}}, + "job_task": Task{Job_uuid: jobUuid, + Created_by_job_task_uuid: taskUuid, + Sequence: 1, + Parameters: task}}, nil) if err != nil { return TempFail{err} @@ -194,9 +195,9 @@ func runner(api IArvadosClient, err = api.Update("job_tasks", taskUuid, map[string]interface{}{ "job_task": Task{ - output: "", - success: true, - progress: 1.0}}, + Output: "", + Success: true, + Progress: 1.0}}, nil) return nil } @@ -213,12 +214,16 @@ func runner(api IArvadosClient, "$(task.outdir)": outdir, "$(task.keep)": keepmount} + log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir) + log.Printf("crunchrunner: $(task.outdir)=%v", outdir) + log.Printf("crunchrunner: $(task.keep)=%v", keepmount) + // Set up subprocess - for k, v := range taskp.command { - taskp.command[k] = substitute(v, replacements) + for k, v := range taskp.Command { + taskp.Command[k] = substitute(v, replacements) } - cmd := exec.Command(taskp.command[0], taskp.command[1:]...) + cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...) cmd.Dir = outdir @@ -237,11 +242,34 @@ func runner(api IArvadosClient, } log.Printf("Running %v%v%v", cmd.Args, stdin, stdout) + var caughtSignal os.Signal + sigChan := setupSignals(cmd) + err = cmd.Start() + if err != nil { + signal.Stop(sigChan) + return TempFail{err} + } + + finishedSignalNotify := make(chan struct{}) + go func(sig <-chan os.Signal) { + for sig := range sig { + caughtSignal = sig + cmd.Process.Signal(caughtSignal) + } + close(finishedSignalNotify) + }(sigChan) - signals := setupSignals(cmd) err = cmd.Wait() - signal.Stop(signals) + signal.Stop(sigChan) + + close(sigChan) + <-finishedSignalNotify + + if caughtSignal != nil { + log.Printf("Caught signal %v", caughtSignal) + return PermFail{} + } if err != nil { // Run() returns ExitError on non-zero exit code, but we handle @@ -257,11 +285,11 @@ func runner(api IArvadosClient, log.Printf("Completed with exit code %v", exitCode) - if inCodes(exitCode, taskp.permanentFailCodes) { + if inCodes(exitCode, taskp.PermanentFailCodes) { success = false - } else if inCodes(exitCode, taskp.temporaryFailCodes) { + } else if inCodes(exitCode, taskp.TemporaryFailCodes) { return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)} - } else if inCodes(exitCode, taskp.successCodes) || cmd.ProcessState.Success() { + } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() { success = true } else { success = false @@ -277,9 +305,9 @@ func runner(api IArvadosClient, err = api.Update("job_tasks", taskUuid, map[string]interface{}{ "job_task": Task{ - output: manifest, - success: success, - progress: 1}}, + Output: manifest, + Success: success, + Progress: 1}}, nil) if err != nil { return TempFail{err} @@ -298,6 +326,24 @@ func main() { log.Fatal(err) } + // Container may not have certificates installed, so need to look for + // /etc/arvados/ca-certificates.crt in addition to normal system certs. + var certFiles = []string{ + "/etc/ssl/certs/ca-certificates.crt", // Debian + "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat + "/etc/arvados/ca-certificates.crt", + } + + certs := x509.NewCertPool() + for _, file := range certFiles { + data, err := ioutil.ReadFile(file) + if err == nil { + log.Printf("Using TLS certificates at %v", file) + certs.AppendCertsFromPEM(data) + } + } + api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs + jobUuid := os.Getenv("JOB_UUID") taskUuid := os.Getenv("TASK_UUID") tmpdir := os.Getenv("TASK_WORK") @@ -317,6 +363,11 @@ func main() { var kc IKeepClient kc, err = keepclient.MakeKeepClient(&api) + if err != nil { + log.Fatal(err) + } + + syscall.Umask(0022) err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) if err == nil {