Merge branch '8876-work-unit' into 8650-container-work-unit
[arvados.git] / services / crunch-run / crunchrun.go
index 01edb0a516fadd33c175df030c7c4c330b2985ba..cebebb104af09d702691f89e45ff2bb2d2619309 100644 (file)
@@ -15,6 +15,7 @@ import (
        "os"
        "os/exec"
        "os/signal"
+       "path"
        "strings"
        "sync"
        "syscall"
@@ -44,6 +45,7 @@ type Mount struct {
        PortableDataHash string `json:"portable_data_hash"`
        UUID             string `json:"uuid"`
        DeviceType       string `json:"device_type"`
+       Path             string `json:"path"`
 }
 
 // Collection record returned by the API server.
@@ -99,7 +101,7 @@ type ContainerRunner struct {
        NewLogWriter
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
-       Stdout        *ThrottledLogger
+       Stdout        io.WriteCloser
        Stderr        *ThrottledLogger
        LogCollection *CollectionWriter
        LogsPDH       *string
@@ -246,6 +248,22 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        runner.Binds = nil
 
        for bind, mnt := range runner.ContainerRecord.Mounts {
+               if bind == "stdout" {
+                       // Is it a "file" mount kind?
+                       if mnt.Kind != "file" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                       }
+
+                       // Does path start with OutputPath?
+                       prefix := runner.ContainerRecord.OutputPath
+                       if !strings.HasSuffix(prefix, "/") {
+                               prefix += "/"
+                       }
+                       if !strings.HasPrefix(mnt.Path, prefix) {
+                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                       }
+               }
+
                if mnt.Kind == "collection" {
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
@@ -296,8 +314,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        } else {
                                runner.Binds = append(runner.Binds, bind)
                        }
-               } else {
-                       return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
                }
        }
 
@@ -383,7 +399,31 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.loggingDone = make(chan bool)
 
-       runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+       if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
+               stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+               index := strings.LastIndex(stdoutPath, "/")
+               if index > 0 {
+                       subdirs := stdoutPath[:index]
+                       if subdirs != "" {
+                               st, err := os.Stat(runner.HostOutputDir)
+                               if err != nil {
+                                       return fmt.Errorf("While Stat on temp dir: %v", err)
+                               }
+                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                               if err != nil {
+                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                               }
+                       }
+               }
+               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               if err != nil {
+                       return fmt.Errorf("While creating stdout file: %v", err)
+               }
+               runner.Stdout = stdoutFile
+       } else {
+               runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+       }
        runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
        go runner.ProcessDockerAttach(containerReader)