Merge branch '11070-ws-listener-problem'
[arvados.git] / sdk / go / crunchrunner / crunchrunner.go
index 02f2be4c6f0d9ab84d238e180bd5e34d4cb1a3bc..5d7e10be4beb34fef1892b2d2d7c150fd9906176 100644 (file)
@@ -1,9 +1,13 @@
 package main
 
 import (
+       "encoding/json"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "io"
+       "io/ioutil"
        "log"
        "os"
        "os/exec"
@@ -17,10 +21,12 @@ type TaskDef struct {
        Env                map[string]string `json:"task.env"`
        Stdin              string            `json:"task.stdin"`
        Stdout             string            `json:"task.stdout"`
+       Stderr             string            `json:"task.stderr"`
        Vwd                map[string]string `json:"task.vwd"`
        SuccessCodes       []int             `json:"task.successCodes"`
        PermanentFailCodes []int             `json:"task.permanentFailCodes"`
        TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+       KeepTmpOutput      bool              `json:"task.keepTmpOutput"`
 }
 
 type Tasks struct {
@@ -46,17 +52,21 @@ type IArvadosClient interface {
        Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
 }
 
-func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
        tmpdir = crunchtmpdir + "/tmpdir"
        err = os.Mkdir(tmpdir, 0700)
        if err != nil {
                return "", "", err
        }
 
-       outdir = crunchtmpdir + "/outdir"
-       err = os.Mkdir(outdir, 0700)
-       if err != nil {
-               return "", "", err
+       if keepTmp {
+               outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
+       } else {
+               outdir = crunchtmpdir + "/outdir"
+               err = os.Mkdir(outdir, 0700)
+               if err != nil {
+                       return "", "", err
+               }
        }
 
        return tmpdir, outdir, nil
@@ -77,15 +87,39 @@ func checkOutputFilename(outdir, fn string) error {
        return nil
 }
 
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func copyFile(dst, src string) error {
+       in, err := os.Open(src)
+       if err != nil {
+               return err
+       }
+       defer in.Close()
+
+       out, err := os.Create(dst)
+       if err != nil {
+               return err
+       }
+       defer out.Close()
+
+       _, err = io.Copy(out, in)
+       return err
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
        if taskp.Vwd != nil {
                for k, v := range taskp.Vwd {
                        v = substitute(v, replacements)
                        err = checkOutputFilename(outdir, k)
                        if err != nil {
-                               return "", "", err
+                               return "", "", "", err
+                       }
+                       if taskp.KeepTmpOutput {
+                               err = copyFile(v, outdir+"/"+k)
+                       } else {
+                               err = os.Symlink(v, outdir+"/"+k)
+                       }
+                       if err != nil {
+                               return "", "", "", err
                        }
-                       os.Symlink(v, outdir+"/"+k)
                }
        }
 
@@ -94,25 +128,40 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
                stdin = substitute(taskp.Stdin, replacements)
                cmd.Stdin, err = os.Open(stdin)
                if err != nil {
-                       return "", "", err
+                       return "", "", "", err
                }
        }
 
        if taskp.Stdout != "" {
                err = checkOutputFilename(outdir, taskp.Stdout)
                if err != nil {
-                       return "", "", err
+                       return "", "", "", err
                }
                // Set up stdout redirection
                stdout = outdir + "/" + taskp.Stdout
                cmd.Stdout, err = os.Create(stdout)
                if err != nil {
-                       return "", "", err
+                       return "", "", "", err
                }
        } else {
                cmd.Stdout = os.Stdout
        }
 
+       if taskp.Stderr != "" {
+               err = checkOutputFilename(outdir, taskp.Stderr)
+               if err != nil {
+                       return "", "", "", err
+               }
+               // Set up stderr redirection
+               stderr = outdir + "/" + taskp.Stderr
+               cmd.Stderr, err = os.Create(stderr)
+               if err != nil {
+                       return "", "", "", err
+               }
+       } else {
+               cmd.Stderr = os.Stderr
+       }
+
        if taskp.Env != nil {
                // Set up subprocess environment
                cmd.Env = os.Environ()
@@ -121,12 +170,12 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
                        cmd.Env = append(cmd.Env, k+"="+v)
                }
        }
-       return stdin, stdout, nil
+       return stdin, stdout, stderr, nil
 }
 
+// Set up signal handlers.  Go sends signal notifications to a "signal
+// channel".
 func setupSignals(cmd *exec.Cmd) chan os.Signal {
-       // Set up signal handlers
-       // Forward SIGINT, SIGTERM and SIGQUIT to inner process
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGTERM)
        signal.Notify(sigChan, syscall.SIGINT)
@@ -161,6 +210,22 @@ func substitute(inp string, subst map[string]string) string {
        return inp
 }
 
+func getKeepTmp(outdir string) (manifest string, err error) {
+       fn, err := os.Open(outdir + "/" + ".arvados#collection")
+       if err != nil {
+               return "", err
+       }
+       defer fn.Close()
+
+       buf, err := ioutil.ReadAll(fn)
+       if err != nil {
+               return "", err
+       }
+       collection := arvados.Collection{}
+       err = json.Unmarshal(buf, &collection)
+       return collection.ManifestText, err
+}
+
 func runner(api IArvadosClient,
        kc IKeepClient,
        jobUuid, taskUuid, crunchtmpdir, keepmount string,
@@ -199,7 +264,7 @@ func runner(api IArvadosClient,
        }
 
        var tmpdir, outdir string
-       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
        if err != nil {
                return TempFail{err}
        }
@@ -209,6 +274,10 @@ func runner(api IArvadosClient,
                "$(task.outdir)": outdir,
                "$(task.keep)":   keepmount}
 
+       log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
+       log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
+       log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
+
        // Set up subprocess
        for k, v := range taskp.Command {
                taskp.Command[k] = substitute(v, replacements)
@@ -218,8 +287,8 @@ func runner(api IArvadosClient,
 
        cmd.Dir = outdir
 
-       var stdin, stdout string
-       stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+       var stdin, stdout, stderr string
+       stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
        if err != nil {
                return err
        }
@@ -231,27 +300,34 @@ func runner(api IArvadosClient,
        if stdout != "" {
                stdout = " > " + stdout
        }
-       log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+       if stderr != "" {
+               stderr = " 2> " + stderr
+       }
+       log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
 
        var caughtSignal os.Signal
-       {
-               sigChan := setupSignals(cmd)
-               defer signal.Stop(sigChan)
+       sigChan := setupSignals(cmd)
 
-               err = cmd.Start()
-               if err != nil {
-                       return TempFail{err}
+       err = cmd.Start()
+       if err != nil {
+               signal.Stop(sigChan)
+               return TempFail{err}
+       }
+
+       finishedSignalNotify := make(chan struct{})
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       caughtSignal = sig
+                       cmd.Process.Signal(caughtSignal)
                }
+               close(finishedSignalNotify)
+       }(sigChan)
 
-               go func(sig <-chan os.Signal) {
-                       for sig := range sig {
-                               caughtSignal = sig
-                               cmd.Process.Signal(caughtSignal)
-                       }
-               }(sigChan)
+       err = cmd.Wait()
+       signal.Stop(sigChan)
 
-               err = cmd.Wait()
-       }
+       close(sigChan)
+       <-finishedSignalNotify
 
        if caughtSignal != nil {
                log.Printf("Caught signal %v", caughtSignal)
@@ -283,7 +359,12 @@ func runner(api IArvadosClient,
        }
 
        // Upload output directory
-       manifest, err := WriteTree(kc, outdir)
+       var manifest string
+       if taskp.KeepTmpOutput {
+               manifest, err = getKeepTmp(outdir)
+       } else {
+               manifest, err = WriteTree(kc, outdir)
+       }
        if err != nil {
                return TempFail{err}
        }
@@ -331,7 +412,7 @@ func main() {
        }
 
        var kc IKeepClient
-       kc, err = keepclient.MakeKeepClient(&api)
+       kc, err = keepclient.MakeKeepClient(api)
        if err != nil {
                log.Fatal(err)
        }