8465: added stdin redirection for json mount.
[arvados.git] / services / crunch-run / crunchrun.go
index 64e79a68ebccb58b28c1cdb5688357b6ebc58341..f22680fca568c92fe9e765185120789d4a37913d 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -49,6 +50,7 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+       CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -361,6 +363,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
 
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
+                       }
+               }
+
                if bind == "/etc/arvados/ca-certificates.crt" {
                        needCertMount = false
                }
@@ -663,8 +672,40 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr keepclient.Reader
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+
+                       defer stdinRdr.Close()
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -698,6 +739,33 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        }
        runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
+       if stdinRdr != nil {
+               copyErrC := make(chan error)
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       copyErrC <- err
+                       close(copyErrC)
+               }()
+
+               copyErr := <-copyErrC
+               if copyErr != nil {
+                       return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
+               }
+       } else if len(stdinJson) != 0 {
+               copyErrC := make(chan error)
+               go func() {
+                       jsonRdr := bytes.NewReader(stdinJson)
+                       _, err := io.Copy(response.Conn, jsonRdr)
+                       copyErrC <- err
+                       close(copyErrC)
+               }()
+
+               copyErr := <-copyErrC
+               if copyErr != nil {
+                       return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
+               }
+       }
+
        go runner.ProcessDockerAttach(response.Reader)
 
        return nil
@@ -743,7 +811,7 @@ func (runner *ContainerRunner) CreateContainer() error {
                }
        }
 
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, nil, nil, "")
+       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
@@ -780,7 +848,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                return fmt.Errorf("container wait: %v", err)
        }
 
-       runner.CrunchLog.Printf("container wait API status code: %v", waitDocker)
+       runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
        code := int(waitDocker)
        runner.ExitCode = &code