X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0fa7cd5bfdab924f316046f923aec282b699d8a1..7314917d65573b0e9d55f7b6522463c470356fba:/sdk/go/crunchrunner/crunchrunner.go?ds=inline diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index 02f2be4c6f..5c3d65c561 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,10 +1,13 @@ package main import ( + "crypto/x509" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" "log" + "net/http" "os" "os/exec" "os/signal" @@ -17,6 +20,7 @@ type TaskDef struct { Env map[string]string `json:"task.env"` Stdin string `json:"task.stdin"` Stdout string `json:"task.stdout"` + Stderr string `json:"task.stderr"` Vwd map[string]string `json:"task.vwd"` SuccessCodes []int `json:"task.successCodes"` PermanentFailCodes []int `json:"task.permanentFailCodes"` @@ -77,13 +81,13 @@ func checkOutputFilename(outdir, fn string) error { return nil } -func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) { +func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) { if taskp.Vwd != nil { for k, v := range taskp.Vwd { v = substitute(v, replacements) err = checkOutputFilename(outdir, k) if err != nil { - return "", "", err + return "", "", "", err } os.Symlink(v, outdir+"/"+k) } @@ -94,25 +98,40 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[ stdin = substitute(taskp.Stdin, replacements) cmd.Stdin, err = os.Open(stdin) if err != nil { - return "", "", err + return "", "", "", err } } if taskp.Stdout != "" { err = checkOutputFilename(outdir, taskp.Stdout) if err != nil { - return "", "", err + return "", "", "", err } // Set up stdout redirection stdout = outdir + "/" + taskp.Stdout cmd.Stdout, err = os.Create(stdout) if err != nil { - return "", "", err + return "", "", "", err } } else { cmd.Stdout = os.Stdout } + if taskp.Stderr != "" { + err = checkOutputFilename(outdir, taskp.Stderr) + if err != nil { + return "", "", "", err + } + // Set up stderr redirection + stderr = outdir + "/" + taskp.Stderr + cmd.Stderr, err = os.Create(stderr) + if err != nil { + return "", "", "", err + } + } else { + cmd.Stderr = os.Stderr + } + if taskp.Env != nil { // Set up subprocess environment cmd.Env = os.Environ() @@ -121,12 +140,12 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[ cmd.Env = append(cmd.Env, k+"="+v) } } - return stdin, stdout, nil + return stdin, stdout, stderr, nil } +// Set up signal handlers. Go sends signal notifications to a "signal +// channel". func setupSignals(cmd *exec.Cmd) chan os.Signal { - // Set up signal handlers - // Forward SIGINT, SIGTERM and SIGQUIT to inner process sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) @@ -209,6 +228,10 @@ func runner(api IArvadosClient, "$(task.outdir)": outdir, "$(task.keep)": keepmount} + log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir) + log.Printf("crunchrunner: $(task.outdir)=%v", outdir) + log.Printf("crunchrunner: $(task.keep)=%v", keepmount) + // Set up subprocess for k, v := range taskp.Command { taskp.Command[k] = substitute(v, replacements) @@ -218,8 +241,8 @@ func runner(api IArvadosClient, cmd.Dir = outdir - var stdin, stdout string - stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements) + var stdin, stdout, stderr string + stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements) if err != nil { return err } @@ -231,27 +254,34 @@ func runner(api IArvadosClient, if stdout != "" { stdout = " > " + stdout } - log.Printf("Running %v%v%v", cmd.Args, stdin, stdout) + if stderr != "" { + stderr = " 2> " + stderr + } + log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr) var caughtSignal os.Signal - { - sigChan := setupSignals(cmd) - defer signal.Stop(sigChan) + sigChan := setupSignals(cmd) - err = cmd.Start() - if err != nil { - return TempFail{err} + err = cmd.Start() + if err != nil { + signal.Stop(sigChan) + return TempFail{err} + } + + finishedSignalNotify := make(chan struct{}) + go func(sig <-chan os.Signal) { + for sig := range sig { + caughtSignal = sig + cmd.Process.Signal(caughtSignal) } + close(finishedSignalNotify) + }(sigChan) - go func(sig <-chan os.Signal) { - for sig := range sig { - caughtSignal = sig - cmd.Process.Signal(caughtSignal) - } - }(sigChan) + err = cmd.Wait() + signal.Stop(sigChan) - err = cmd.Wait() - } + close(sigChan) + <-finishedSignalNotify if caughtSignal != nil { log.Printf("Caught signal %v", caughtSignal) @@ -313,6 +343,24 @@ func main() { log.Fatal(err) } + // Container may not have certificates installed, so need to look for + // /etc/arvados/ca-certificates.crt in addition to normal system certs. + var certFiles = []string{ + "/etc/ssl/certs/ca-certificates.crt", // Debian + "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat + "/etc/arvados/ca-certificates.crt", + } + + certs := x509.NewCertPool() + for _, file := range certFiles { + data, err := ioutil.ReadFile(file) + if err == nil { + log.Printf("Using TLS certificates at %v", file) + certs.AppendCertsFromPEM(data) + } + } + api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs + jobUuid := os.Getenv("JOB_UUID") taskUuid := os.Getenv("TASK_UUID") tmpdir := os.Getenv("TASK_WORK") @@ -331,7 +379,7 @@ func main() { } var kc IKeepClient - kc, err = keepclient.MakeKeepClient(&api) + kc, err = keepclient.MakeKeepClient(api) if err != nil { log.Fatal(err) }