Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
index 9f2ae2501c08c4bbfd3dd23bb92ffff610910870..ca16fc656155cb43bbb1c54290053c4f113208fe 100644 (file)
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package main
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       //"git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "errors"
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
        "log"
        "os"
        "os/exec"
        "os/signal"
+       "strings"
        "syscall"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
-func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
-       r = make(arvadosclient.Dict)
-       err := api.Get(rsc, uuid, nil, &r)
-       if err != nil {
-               log.Fatal(err)
-       }
-       return r
+type TaskDef struct {
+       Command            []string          `json:"command"`
+       Env                map[string]string `json:"task.env"`
+       Stdin              string            `json:"task.stdin"`
+       Stdout             string            `json:"task.stdout"`
+       Stderr             string            `json:"task.stderr"`
+       Vwd                map[string]string `json:"task.vwd"`
+       SuccessCodes       []int             `json:"task.successCodes"`
+       PermanentFailCodes []int             `json:"task.permanentFailCodes"`
+       TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+       KeepTmpOutput      bool              `json:"task.keepTmpOutput"`
+}
+
+type Tasks struct {
+       Tasks []TaskDef `json:"tasks"`
+}
+
+type Job struct {
+       ScriptParameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+       JobUUID              string  `json:"job_uuid"`
+       CreatedByJobTaskUUID string  `json:"created_by_job_task_uuid"`
+       Parameters           TaskDef `json:"parameters"`
+       Sequence             int     `json:"sequence"`
+       Output               string  `json:"output"`
+       Success              bool    `json:"success"`
+       Progress             float32 `json:"sequence"`
+}
+
+type IArvadosClient interface {
+       Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
 }
 
-func setupDirectories(tmpdir string) (outdir string, err error) {
-       err = os.Chdir(tmpdir)
+func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
+       tmpdir = crunchtmpdir + "/tmpdir"
+       err = os.Mkdir(tmpdir, 0700)
        if err != nil {
-               return "", err
+               return "", "", err
        }
 
-       err = os.Mkdir("tmpdir", 0700)
-       if err != nil {
-               return "", err
+       if keepTmp {
+               outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
+       } else {
+               outdir = crunchtmpdir + "/outdir"
+               err = os.Mkdir(outdir, 0700)
+               if err != nil {
+                       return "", "", err
+               }
        }
 
-       err = os.Mkdir("outdir", 0700)
-       if err != nil {
-               return "", err
+       return tmpdir, outdir, nil
+}
+
+func checkOutputFilename(outdir, fn string) error {
+       if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
+               return fmt.Errorf("Path must not start or end with '/'")
+       }
+       if strings.Index("../", fn) != -1 {
+               return fmt.Errorf("Path must not contain '../'")
        }
 
-       os.Chdir("outdir")
+       sl := strings.LastIndex(fn, "/")
+       if sl != -1 {
+               os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
+       }
+       return nil
+}
+
+func copyFile(dst, src string) error {
+       in, err := os.Open(src)
        if err != nil {
-               return "", err
+               return err
        }
+       defer in.Close()
 
-       outdir, err = os.Getwd()
+       out, err := os.Create(dst)
        if err != nil {
-               return "", err
+               return err
        }
+       defer out.Close()
 
-       return outdir, nil
+       _, err = io.Copy(out, in)
+       return err
 }
 
-func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
-       var err error
-
-       if taskp["task.vwd"] != nil {
-               // Set up VWD symlinks in outdir
-               // TODO
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
+       if taskp.Vwd != nil {
+               for k, v := range taskp.Vwd {
+                       v = substitute(v, replacements)
+                       err = checkOutputFilename(outdir, k)
+                       if err != nil {
+                               return "", "", "", err
+                       }
+                       if taskp.KeepTmpOutput {
+                               err = copyFile(v, outdir+"/"+k)
+                       } else {
+                               err = os.Symlink(v, outdir+"/"+k)
+                       }
+                       if err != nil {
+                               return "", "", "", err
+                       }
+               }
        }
 
-       if taskp["task.stdin"] != nil {
-               stdin, ok := taskp["task.stdin"].(string)
-               if !ok {
-                       return errors.New("Could not cast task.stdin to string")
-               }
+       if taskp.Stdin != "" {
                // Set up stdin redirection
-               cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
+               stdin = substitute(taskp.Stdin, replacements)
+               cmd.Stdin, err = os.Open(stdin)
                if err != nil {
-                       log.Fatal(err)
+                       return "", "", "", err
                }
        }
 
-       if taskp["task.stdout"] != nil {
-               stdout, ok := taskp["task.stdout"].(string)
-               if !ok {
-                       return errors.New("Could not cast task.stdout to string")
+       if taskp.Stdout != "" {
+               err = checkOutputFilename(outdir, taskp.Stdout)
+               if err != nil {
+                       return "", "", "", err
                }
-
                // Set up stdout redirection
-               cmd.Stdout, err = os.Open(outdir + "/" + stdout)
+               stdout = outdir + "/" + taskp.Stdout
+               cmd.Stdout, err = os.Create(stdout)
                if err != nil {
-                       log.Fatal(err)
+                       return "", "", "", err
                }
        } else {
                cmd.Stdout = os.Stdout
        }
 
-       if taskp["task.env"] != nil {
-               taskenv, ok := taskp["task.env"].(map[string]interface{})
-               if !ok {
-                       return errors.New("Could not cast task.env to map")
+       if taskp.Stderr != "" {
+               err = checkOutputFilename(outdir, taskp.Stderr)
+               if err != nil {
+                       return "", "", "", err
                }
+               // Set up stderr redirection
+               stderr = outdir + "/" + taskp.Stderr
+               cmd.Stderr, err = os.Create(stderr)
+               if err != nil {
+                       return "", "", "", err
+               }
+       } else {
+               cmd.Stderr = os.Stderr
+       }
 
+       if taskp.Env != nil {
                // Set up subprocess environment
                cmd.Env = os.Environ()
-               for k, v := range taskenv {
-                       var vstr string
-                       vstr, ok = v.(string)
-                       if !ok {
-                               return errors.New("Could not cast environment value to string")
-                       }
-                       cmd.Env = append(cmd.Env, k+"="+vstr)
+               for k, v := range taskp.Env {
+                       v = substitute(v, replacements)
+                       cmd.Env = append(cmd.Env, k+"="+v)
                }
        }
-       return nil
+       return stdin, stdout, stderr, nil
 }
 
-func setupSignals(cmd *exec.Cmd) {
-       // Set up signal handlers
-       // Forward SIGINT, SIGTERM and SIGQUIT to inner process
+// Set up signal handlers.  Go sends signal notifications to a "signal
+// channel".
+func setupSignals(cmd *exec.Cmd) chan os.Signal {
        sigChan := make(chan os.Signal, 1)
-       go func(sig <-chan os.Signal) {
-               catch := <-sig
-               if cmd.Process != nil {
-                       cmd.Process.Signal(catch)
-               }
-       }(sigChan)
        signal.Notify(sigChan, syscall.SIGTERM)
        signal.Notify(sigChan, syscall.SIGINT)
        signal.Notify(sigChan, syscall.SIGQUIT)
+       return sigChan
 }
 
-func inCodes(code int, codes interface{}) bool {
+func inCodes(code int, codes []int) bool {
        if codes != nil {
-               codesArray, ok := codes.([]interface{})
-               if !ok {
-                       return false
-               }
-               for _, c := range codesArray {
-                       var num float64
-                       num, ok = c.(float64)
-                       if ok && code == int(num) {
+               for _, c := range codes {
+                       if code == c {
                                return true
                        }
                }
@@ -138,61 +201,66 @@ func inCodes(code int, codes interface{}) bool {
 
 const TASK_TEMPFAIL = 111
 
-type TempFail struct{ InnerError error }
+type TempFail struct{ error }
 type PermFail struct{}
 
-func (s TempFail) Error() string {
-       return s.InnerError.Error()
-}
-
 func (s PermFail) Error() string {
        return "PermFail"
 }
 
-func runner(api arvadosclient.ArvadosClient,
-       jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
-       taskStruct arvadosclient.Dict) error {
+func substitute(inp string, subst map[string]string) string {
+       for k, v := range subst {
+               inp = strings.Replace(inp, k, v, -1)
+       }
+       return inp
+}
 
-       var err error
-       var ok bool
-       var jobp, taskp map[string]interface{}
-       jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
-       if !ok {
-               return errors.New("Could not cast job script_parameters to map")
+func getKeepTmp(outdir string) (manifest string, err error) {
+       fn, err := os.Open(outdir + "/" + ".arvados#collection")
+       if err != nil {
+               return "", err
        }
+       defer fn.Close()
 
-       taskp, ok = taskStruct["parameters"].(map[string]interface{})
-       if !ok {
-               return errors.New("Could not cast task parameters to map")
+       buf, err := ioutil.ReadAll(fn)
+       if err != nil {
+               return "", err
        }
+       collection := arvados.Collection{}
+       err = json.Unmarshal(buf, &collection)
+       return collection.ManifestText, err
+}
+
+func runner(api IArvadosClient,
+       kc IKeepClient,
+       jobUUID, taskUUID, crunchtmpdir, keepmount string,
+       jobStruct Job, taskStruct Task) error {
+
+       var err error
+       taskp := taskStruct.Parameters
 
        // If this is task 0 and there are multiple tasks, dispatch subtasks
        // and exit.
-       if taskStruct["sequence"] == 0.0 {
-               var tasks []interface{}
-               tasks, ok = jobp["tasks"].([]interface{})
-               if !ok {
-                       return errors.New("Could not cast tasks to array")
-               }
-
-               if len(tasks) == 1 {
-                       taskp = tasks[0].(map[string]interface{})
+       if taskStruct.Sequence == 0 {
+               if len(jobStruct.ScriptParameters.Tasks) == 1 {
+                       taskp = jobStruct.ScriptParameters.Tasks[0]
                } else {
-                       for task := range tasks {
-                               err := api.Call("POST", "job_tasks", "", "",
-                                       arvadosclient.Dict{
-                                               "job_uuid":                 jobUuid,
-                                               "created_by_job_task_uuid": "",
-                                               "sequence":                 1,
-                                               "parameters":               task},
+                       for _, task := range jobStruct.ScriptParameters.Tasks {
+                               err := api.Create("job_tasks",
+                                       map[string]interface{}{
+                                               "job_task": Task{
+                                                       JobUUID:              jobUUID,
+                                                       CreatedByJobTaskUUID: taskUUID,
+                                                       Sequence:             1,
+                                                       Parameters:           task}},
                                        nil)
                                if err != nil {
                                        return TempFail{err}
                                }
                        }
-                       err = api.Call("PUT", "job_tasks", taskUuid, "",
-                               arvadosclient.Dict{
-                                       "job_task": arvadosclient.Dict{
+                       err = api.Update("job_tasks", taskUUID,
+                               map[string]interface{}{
+                                       "job_task": map[string]interface{}{
                                                "output":   "",
                                                "success":  true,
                                                "progress": 1.0}},
@@ -201,84 +269,125 @@ func runner(api arvadosclient.ArvadosClient,
                }
        }
 
-       // Set up subprocess
-       var commandline []string
-       var commandsarray []interface{}
-
-       commandsarray, ok = taskp["command"].([]interface{})
-       if !ok {
-               return errors.New("Could not cast commands to array")
+       var tmpdir, outdir string
+       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
+       if err != nil {
+               return TempFail{err}
        }
 
-       for _, c := range commandsarray {
-               var cstr string
-               cstr, ok = c.(string)
-               if !ok {
-                       return errors.New("Could not cast command argument to string")
-               }
-               commandline = append(commandline, cstr)
-       }
-       cmd := exec.Command(commandline[0], commandline[1:]...)
+       replacements := map[string]string{
+               "$(task.tmpdir)": tmpdir,
+               "$(task.outdir)": outdir,
+               "$(task.keep)":   keepmount}
 
-       var outdir string
-       outdir, err = setupDirectories(tmpdir)
-       if err != nil {
-               return TempFail{err}
+       log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
+       log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
+       log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
+
+       // Set up subprocess
+       for k, v := range taskp.Command {
+               taskp.Command[k] = substitute(v, replacements)
        }
 
+       cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
+
        cmd.Dir = outdir
 
-       err = setupCommand(cmd, taskp, keepmount, outdir)
+       var stdin, stdout, stderr string
+       stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
        if err != nil {
                return err
        }
 
-       setupSignals(cmd)
-
        // Run subprocess and wait for it to complete
-       log.Printf("Running %v", cmd.Args)
+       if stdin != "" {
+               stdin = " < " + stdin
+       }
+       if stdout != "" {
+               stdout = " > " + stdout
+       }
+       if stderr != "" {
+               stderr = " 2> " + stderr
+       }
+       log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
 
-       err = cmd.Run()
+       var caughtSignal os.Signal
+       sigChan := setupSignals(cmd)
 
+       err = cmd.Start()
        if err != nil {
+               signal.Stop(sigChan)
                return TempFail{err}
        }
 
-       const success = 1
-       const permfail = 2
-       const tempfail = 2
-       var status int
+       finishedSignalNotify := make(chan struct{})
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       caughtSignal = sig
+                       cmd.Process.Signal(caughtSignal)
+               }
+               close(finishedSignalNotify)
+       }(sigChan)
+
+       err = cmd.Wait()
+       signal.Stop(sigChan)
+
+       close(sigChan)
+       <-finishedSignalNotify
+
+       if caughtSignal != nil {
+               log.Printf("Caught signal %v", caughtSignal)
+               return PermFail{}
+       }
+
+       if err != nil {
+               // Run() returns ExitError on non-zero exit code, but we handle
+               // that down below.  So only return if it's not ExitError.
+               if _, ok := err.(*exec.ExitError); !ok {
+                       return TempFail{err}
+               }
+       }
+
+       var success bool
 
        exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
 
-       if inCodes(exitCode, taskp["task.successCodes"]) {
-               status = success
-       } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
-               status = permfail
-       } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
-               os.Exit(TASK_TEMPFAIL)
-       } else if cmd.ProcessState.Success() {
-               status = success
+       log.Printf("Completed with exit code %v", exitCode)
+
+       if inCodes(exitCode, taskp.PermanentFailCodes) {
+               success = false
+       } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
+               return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
+       } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
+               success = true
        } else {
-               status = permfail
+               success = false
        }
 
        // Upload output directory
-       // TODO
+       var manifest string
+       if taskp.KeepTmpOutput {
+               manifest, err = getKeepTmp(outdir)
+       } else {
+               manifest, err = WriteTree(kc, outdir)
+       }
+       if err != nil {
+               return TempFail{err}
+       }
 
        // Set status
-       err = api.Call("PUT", "job_tasks", taskUuid, "",
-               arvadosclient.Dict{
-                       "job_task": arvadosclient.Dict{
-                               "output":   "",
-                               "success":  status == success,
-                               "progress": 1.0}},
+       err = api.Update("job_tasks", taskUUID,
+               map[string]interface{}{
+                       "job_task": Task{
+                               Output:   manifest,
+                               Success:  success,
+                               Progress: 1}},
                nil)
        if err != nil {
                return TempFail{err}
        }
 
-       if status == success {
+       if success {
                return nil
        } else {
                return PermFail{}
@@ -286,22 +395,36 @@ func runner(api arvadosclient.ArvadosClient,
 }
 
 func main() {
-       syscall.Umask(0077)
-
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatal(err)
        }
 
-       jobUuid := os.Getenv("JOB_UUID")
-       taskUuid := os.Getenv("TASK_UUID")
+       jobUUID := os.Getenv("JOB_UUID")
+       taskUUID := os.Getenv("TASK_UUID")
        tmpdir := os.Getenv("TASK_WORK")
        keepmount := os.Getenv("TASK_KEEPMOUNT")
 
-       jobStruct := getRecord(api, "jobs", jobUuid)
-       taskStruct := getRecord(api, "job_tasks", taskUuid)
+       var jobStruct Job
+       var taskStruct Task
+
+       err = api.Get("jobs", jobUUID, nil, &jobStruct)
+       if err != nil {
+               log.Fatal(err)
+       }
+       err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       var kc IKeepClient
+       kc, err = keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Fatal(err)
+       }
 
-       err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+       syscall.Umask(0022)
+       err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
 
        if err == nil {
                os.Exit(0)