Merge branch '11629-groups-contents-memory'
[arvados.git] / services / crunch-run / crunchrun.go
index 64e79a68ebccb58b28c1cdb5688357b6ebc58341..07937291724c8cde9f6a938414924e205e63c0ef 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -134,7 +135,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -144,6 +145,7 @@ type ContainerRunner struct {
        HostOutputDir  string
        CleanupTempDir []string
        Binds          []string
+       Volumes        map[string]struct{}
        OutputPDH      *string
        SigChan        chan os.Signal
        ArvMountExit   chan error
@@ -335,6 +337,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        collectionPaths := []string{}
        runner.Binds = nil
+       runner.Volumes = make(map[string]struct{})
        needCertMount := true
 
        var binds []string
@@ -345,10 +348,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -357,7 +360,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                       }
+               }
+
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -372,7 +382,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -420,24 +430,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                        collectionPaths = append(collectionPaths, src)
 
-               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
-                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+               case mnt.Kind == "tmp":
+                       var tmpdir string
+                       tmpdir, err = runner.MkTempDir("", "")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
-                       st, staterr := os.Stat(runner.HostOutputDir)
+                       st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
                                return fmt.Errorf("While Stat on temp dir: %v", staterr)
                        }
-                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-
-               case mnt.Kind == "tmp":
-                       runner.Binds = append(runner.Binds, bind)
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       if bind == runner.Container.OutputPath {
+                               runner.HostOutputDir = tmpdir
+                       }
 
                case mnt.Kind == "json":
                        jsondata, err := json.Marshal(mnt.Content)
@@ -623,11 +634,12 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
 // Get and save the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
-               runner.ArvClient,
-               runner.Container.UUID,
-               "container",
-               runner.LogCollection.Open("container.json"),
+               ArvClient:     runner.ArvClient,
+               UUID:          runner.Container.UUID,
+               loggingStream: "container",
+               writeCloser:   runner.LogCollection.Open("container.json"),
        }
+
        // Get Container record JSON from the API Server
        reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
        if err != nil {
@@ -657,14 +669,44 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
        return nil
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr keepclient.Reader
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -672,37 +714,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
+
+       if stdinRdr != nil {
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
+               }()
+       } else if len(stdinJson) != 0 {
+               go func() {
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
+               }()
+       }
 
        go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -716,6 +797,8 @@ func (runner *ContainerRunner) CreateContainer() error {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
 
+       runner.ContainerConfig.Volumes = runner.Volumes
+
        runner.HostConfig = dockercontainer.HostConfig{
                Binds:  runner.Binds,
                Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
@@ -743,7 +826,14 @@ func (runner *ContainerRunner) CreateContainer() error {
                }
        }
 
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, nil, nil, "")
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
+       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
@@ -780,7 +870,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                return fmt.Errorf("container wait: %v", err)
        }
 
-       runner.CrunchLog.Printf("container wait API status code: %v", waitDocker)
+       runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
        code := int(waitDocker)
        runner.ExitCode = &code
 
@@ -977,8 +1067,8 @@ func (runner *ContainerRunner) CommitLogs() error {
        // point, but re-open crunch log with ArvClient in case there are any
        // other further (such as failing to write the log to Keep!) while
        // shutting down
-       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
-               "crunch-run", nil})
+       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+               UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
 
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@ -1065,7 +1155,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-       return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+       return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+               writeCloser: runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
@@ -1205,6 +1296,9 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+       loadLogThrottleParams(api)
+
        return cr
 }