Merge branch '11469-volume-tmp' refs #11469
[arvados.git] / services / crunch-run / crunchrun.go
index f22680fca568c92fe9e765185120789d4a37913d..c6847bc19db3116d7286b98e1914f144c8d66a8b 100644 (file)
@@ -50,7 +50,6 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
-       CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -136,7 +135,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -146,6 +145,7 @@ type ContainerRunner struct {
        HostOutputDir  string
        CleanupTempDir []string
        Binds          []string
+       Volumes        map[string]struct{}
        OutputPDH      *string
        SigChan        chan os.Signal
        ArvMountExit   chan error
@@ -337,6 +337,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        collectionPaths := []string{}
        runner.Binds = nil
+       runner.Volumes = make(map[string]struct{})
        needCertMount := true
 
        var binds []string
@@ -347,10 +348,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -359,7 +360,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
                        }
                }
 
@@ -381,7 +382,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -429,24 +430,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                        collectionPaths = append(collectionPaths, src)
 
-               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
-                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+               case mnt.Kind == "tmp":
+                       var tmpdir string
+                       tmpdir, err = runner.MkTempDir("", "")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
-                       st, staterr := os.Stat(runner.HostOutputDir)
+                       st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
                                return fmt.Errorf("While Stat on temp dir: %v", staterr)
                        }
-                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-
-               case mnt.Kind == "tmp":
-                       runner.Binds = append(runner.Binds, bind)
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       if bind == runner.Container.OutputPath {
+                               runner.HostOutputDir = tmpdir
+                       }
 
                case mnt.Kind == "json":
                        jsondata, err := json.Marshal(mnt.Content)
@@ -666,8 +668,8 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
        return nil
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
@@ -687,14 +689,12 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                                return fmt.Errorf("While getting stding collection: %v", err)
                        }
 
-                       stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
                        if os.IsNotExist(err) {
                                return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
                        } else if err != nil {
                                return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
                        }
-
-                       defer stdinRdr.Close()
                } else if stdinMnt.Kind == "json" {
                        stdinJson, err = json.Marshal(stdinMnt.Content)
                        if err != nil {
@@ -713,57 +713,44 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
 
        if stdinRdr != nil {
-               copyErrC := make(chan error)
                go func() {
                        _, err := io.Copy(response.Conn, stdinRdr)
-                       copyErrC <- err
-                       close(copyErrC)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
                }()
-
-               copyErr := <-copyErrC
-               if copyErr != nil {
-                       return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
-               }
        } else if len(stdinJson) != 0 {
-               copyErrC := make(chan error)
                go func() {
-                       jsonRdr := bytes.NewReader(stdinJson)
-                       _, err := io.Copy(response.Conn, jsonRdr)
-                       copyErrC <- err
-                       close(copyErrC)
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
                }()
-
-               copyErr := <-copyErrC
-               if copyErr != nil {
-                       return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
-               }
        }
 
        go runner.ProcessDockerAttach(response.Reader)
@@ -771,6 +758,31 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -784,6 +796,8 @@ func (runner *ContainerRunner) CreateContainer() error {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
 
+       runner.ContainerConfig.Volumes = runner.Volumes
+
        runner.HostConfig = dockercontainer.HostConfig{
                Binds:  runner.Binds,
                Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
@@ -811,6 +825,13 @@ func (runner *ContainerRunner) CreateContainer() error {
                }
        }
 
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
        createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)