8815: Now expect /usr/local/bin/crunchrunner. Bind mount host certificates to
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
index d3f6fcbb43c8cd06e5de1b657a23e60c5413a4ce..de63a20745e22c058eddb3c482d34c88a43252be 100644 (file)
@@ -1,44 +1,48 @@
 package main
 
 import (
+       "crypto/x509"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "io/ioutil"
        "log"
+       "net/http"
        "os"
        "os/exec"
        "os/signal"
+       "path"
        "strings"
        "syscall"
 )
 
 type TaskDef struct {
-       command            []string          `json:"command"`
-       env                map[string]string `json:"task.env"`
-       stdin              string            `json:"task.stdin"`
-       stdout             string            `json:"task.stdout"`
-       vwd                map[string]string `json:"task.vwd"`
-       successCodes       []int             `json:"task.successCodes"`
-       permanentFailCodes []int             `json:"task.permanentFailCodes"`
-       temporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+       Command            []string          `json:"command"`
+       Env                map[string]string `json:"task.env"`
+       Stdin              string            `json:"task.stdin"`
+       Stdout             string            `json:"task.stdout"`
+       Vwd                map[string]string `json:"task.vwd"`
+       SuccessCodes       []int             `json:"task.successCodes"`
+       PermanentFailCodes []int             `json:"task.permanentFailCodes"`
+       TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
 }
 
 type Tasks struct {
-       tasks []TaskDef `json:"script_parameters"`
+       Tasks []TaskDef `json:"tasks"`
 }
 
 type Job struct {
-       script_parameters Tasks `json:"script_parameters"`
+       Script_parameters Tasks `json:"script_parameters"`
 }
 
 type Task struct {
-       job_uuid                 string  `json:"job_uuid"`
-       created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
-       parameters               TaskDef `json:"parameters"`
-       sequence                 int     `json:"sequence"`
-       output                   string  `json:"output"`
-       success                  bool    `json:"success"`
-       progress                 float32 `json:"sequence"`
+       Job_uuid                 string  `json:"job_uuid"`
+       Created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
+       Parameters               TaskDef `json:"parameters"`
+       Sequence                 int     `json:"sequence"`
+       Output                   string  `json:"output"`
+       Success                  bool    `json:"success"`
+       Progress                 float32 `json:"sequence"`
 }
 
 type IArvadosClient interface {
@@ -78,8 +82,8 @@ func checkOutputFilename(outdir, fn string) error {
 }
 
 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
-       if taskp.vwd != nil {
-               for k, v := range taskp.vwd {
+       if taskp.Vwd != nil {
+               for k, v := range taskp.Vwd {
                        v = substitute(v, replacements)
                        err = checkOutputFilename(outdir, k)
                        if err != nil {
@@ -89,22 +93,22 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
                }
        }
 
-       if taskp.stdin != "" {
+       if taskp.Stdin != "" {
                // Set up stdin redirection
-               stdin = substitute(taskp.stdin, replacements)
+               stdin = substitute(taskp.Stdin, replacements)
                cmd.Stdin, err = os.Open(stdin)
                if err != nil {
                        return "", "", err
                }
        }
 
-       if taskp.stdout != "" {
-               err = checkOutputFilename(outdir, taskp.stdout)
+       if taskp.Stdout != "" {
+               err = checkOutputFilename(outdir, taskp.Stdout)
                if err != nil {
                        return "", "", err
                }
                // Set up stdout redirection
-               stdout = outdir + "/" + taskp.stdout
+               stdout = outdir + "/" + taskp.Stdout
                cmd.Stdout, err = os.Create(stdout)
                if err != nil {
                        return "", "", err
@@ -113,10 +117,12 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
                cmd.Stdout = os.Stdout
        }
 
-       if taskp.env != nil {
+       cmd.Stderr = os.Stderr
+
+       if taskp.Env != nil {
                // Set up subprocess environment
                cmd.Env = os.Environ()
-               for k, v := range taskp.env {
+               for k, v := range taskp.Env {
                        v = substitute(v, replacements)
                        cmd.Env = append(cmd.Env, k+"="+v)
                }
@@ -124,14 +130,10 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
        return stdin, stdout, nil
 }
 
+// Set up signal handlers.  Go sends signal notifications to a "signal
+// channel".
 func setupSignals(cmd *exec.Cmd) chan os.Signal {
-       // Set up signal handlers
-       // Forward SIGINT, SIGTERM and SIGQUIT to inner process
        sigChan := make(chan os.Signal, 1)
-       go func(sig <-chan os.Signal) {
-               catch := <-sig
-               cmd.Process.Signal(catch)
-       }(sigChan)
        signal.Notify(sigChan, syscall.SIGTERM)
        signal.Notify(sigChan, syscall.SIGINT)
        signal.Notify(sigChan, syscall.SIGQUIT)
@@ -171,21 +173,21 @@ func runner(api IArvadosClient,
        jobStruct Job, taskStruct Task) error {
 
        var err error
-       taskp := taskStruct.parameters
+       taskp := taskStruct.Parameters
 
        // If this is task 0 and there are multiple tasks, dispatch subtasks
        // and exit.
-       if taskStruct.sequence == 0 {
-               if len(jobStruct.script_parameters.tasks) == 1 {
-                       taskp = jobStruct.script_parameters.tasks[0]
+       if taskStruct.Sequence == 0 {
+               if len(jobStruct.Script_parameters.Tasks) == 1 {
+                       taskp = jobStruct.Script_parameters.Tasks[0]
                } else {
-                       for _, task := range jobStruct.script_parameters.tasks {
+                       for _, task := range jobStruct.Script_parameters.Tasks {
                                err := api.Create("job_tasks",
                                        map[string]interface{}{
-                                               "job_task": Task{job_uuid: jobUuid,
-                                                       created_by_job_task_uuid: taskUuid,
-                                                       sequence:                 1,
-                                                       parameters:               task}},
+                                               "job_task": Task{Job_uuid: jobUuid,
+                                                       Created_by_job_task_uuid: taskUuid,
+                                                       Sequence:                 1,
+                                                       Parameters:               task}},
                                        nil)
                                if err != nil {
                                        return TempFail{err}
@@ -194,9 +196,9 @@ func runner(api IArvadosClient,
                        err = api.Update("job_tasks", taskUuid,
                                map[string]interface{}{
                                        "job_task": Task{
-                                               output:   "",
-                                               success:  true,
-                                               progress: 1.0}},
+                                               Output:   "",
+                                               Success:  true,
+                                               Progress: 1.0}},
                                nil)
                        return nil
                }
@@ -213,12 +215,16 @@ func runner(api IArvadosClient,
                "$(task.outdir)": outdir,
                "$(task.keep)":   keepmount}
 
+       log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
+       log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
+       log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
+
        // Set up subprocess
-       for k, v := range taskp.command {
-               taskp.command[k] = substitute(v, replacements)
+       for k, v := range taskp.Command {
+               taskp.Command[k] = substitute(v, replacements)
        }
 
-       cmd := exec.Command(taskp.command[0], taskp.command[1:]...)
+       cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
 
        cmd.Dir = outdir
 
@@ -237,11 +243,34 @@ func runner(api IArvadosClient,
        }
        log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
 
+       var caughtSignal os.Signal
+       sigChan := setupSignals(cmd)
+
        err = cmd.Start()
+       if err != nil {
+               signal.Stop(sigChan)
+               return TempFail{err}
+       }
+
+       finishedSignalNotify := make(chan struct{})
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       caughtSignal = sig
+                       cmd.Process.Signal(caughtSignal)
+               }
+               close(finishedSignalNotify)
+       }(sigChan)
 
-       signals := setupSignals(cmd)
        err = cmd.Wait()
-       signal.Stop(signals)
+       signal.Stop(sigChan)
+
+       close(sigChan)
+       <-finishedSignalNotify
+
+       if caughtSignal != nil {
+               log.Printf("Caught signal %v", caughtSignal)
+               return PermFail{}
+       }
 
        if err != nil {
                // Run() returns ExitError on non-zero exit code, but we handle
@@ -257,11 +286,11 @@ func runner(api IArvadosClient,
 
        log.Printf("Completed with exit code %v", exitCode)
 
-       if inCodes(exitCode, taskp.permanentFailCodes) {
+       if inCodes(exitCode, taskp.PermanentFailCodes) {
                success = false
-       } else if inCodes(exitCode, taskp.temporaryFailCodes) {
+       } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
                return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
-       } else if inCodes(exitCode, taskp.successCodes) || cmd.ProcessState.Success() {
+       } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
                success = true
        } else {
                success = false
@@ -277,9 +306,9 @@ func runner(api IArvadosClient,
        err = api.Update("job_tasks", taskUuid,
                map[string]interface{}{
                        "job_task": Task{
-                               output:   manifest,
-                               success:  success,
-                               progress: 1}},
+                               Output:   manifest,
+                               Success:  success,
+                               Progress: 1}},
                nil)
        if err != nil {
                return TempFail{err}
@@ -298,6 +327,24 @@ func main() {
                log.Fatal(err)
        }
 
+       // Container may not have certificates installed, so need to look for
+       // /etc/arvados/ca-certificates.crt in addition to normal system certs.
+       var certFiles = []string{
+               "/etc/ssl/certs/ca-certificates.crt", // Debian
+               "/etc/pki/tls/certs/ca-bundle.crt",   // Red Hat
+               "/etc/arvados/ca-certificates.crt",
+       }
+
+       certs := x509.NewCertPool()
+       for _, file := range certFiles {
+               data, err := ioutil.ReadFile(file)
+               if err == nil {
+                       log.Printf("Using TLS certificates at %v", file)
+                       certs.AppendCertsFromPEM(data)
+               }
+       }
+       api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
+
        jobUuid := os.Getenv("JOB_UUID")
        taskUuid := os.Getenv("TASK_UUID")
        tmpdir := os.Getenv("TASK_WORK")
@@ -317,6 +364,11 @@ func main() {
 
        var kc IKeepClient
        kc, err = keepclient.MakeKeepClient(&api)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       syscall.Umask(0022)
        err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
 
        if err == nil {