8465: added stdin redirection for json mount.
authorradhika <radhika@curoverse.com>
Wed, 5 Apr 2017 23:53:28 +0000 (19:53 -0400)
committerradhika <radhika@curoverse.com>
Wed, 5 Apr 2017 23:53:28 +0000 (19:53 -0400)
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 52829be29ae995bc14272884c2bc2cf88f4da428..f22680fca568c92fe9e765185120789d4a37913d 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -364,8 +365,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                if bind == "stdin" {
                        // Is it a "collection" mount kind?
-                       if mnt.Kind != "collection" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' is supported.", mnt.Kind)
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -672,29 +673,37 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.CrunchLog.Print("Attaching container streams")
 
        // If stdin mount is provided, attach it to the docker container
-       var stdinUsed bool
        var stdinRdr keepclient.Reader
+       var stdinJson []byte
        if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
-               var stdinColl arvados.Collection
-               collId := stdinMnt.UUID
-               if collId == "" {
-                       collId = stdinMnt.PortableDataHash
-               }
-               err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
-               if err != nil {
-                       return fmt.Errorf("While getting stding collection: %v", err)
-               }
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
 
-               stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
-               if os.IsNotExist(err) {
-                       return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
-               } else if err != nil {
-                       return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+
+                       defer stdinRdr.Close()
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
                }
-               stdinUsed = true
-               defer stdinRdr.Close()
        }
 
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
                dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
@@ -730,18 +739,30 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        }
        runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
-       if stdinUsed {
+       if stdinRdr != nil {
+               copyErrC := make(chan error)
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       copyErrC <- err
+                       close(copyErrC)
+               }()
+
+               copyErr := <-copyErrC
+               if copyErr != nil {
+                       return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
+               }
+       } else if len(stdinJson) != 0 {
                copyErrC := make(chan error)
                go func() {
-                       n, err := io.Copy(response.Conn, stdinRdr)
-                       runner.CrunchLog.Printf("BYTES READ = %v", n)
+                       jsonRdr := bytes.NewReader(stdinJson)
+                       _, err := io.Copy(response.Conn, jsonRdr)
                        copyErrC <- err
                        close(copyErrC)
                }()
 
                copyErr := <-copyErrC
                if copyErr != nil {
-                       return fmt.Errorf("While writing stdin to docker container %q", copyErr)
+                       return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
                }
        }
 
index a65d3660541902835bdae308faf99c1f52b23714..8852bffe2628ca8eb475ed16787da2ab7cf7ea43 100644 (file)
@@ -10,6 +10,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "net"
        "os"
        "os/exec"
        "path/filepath"
@@ -94,8 +95,21 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
        return t
 }
 
+type MockConn struct {
+       net.Conn
+}
+
+func (m *MockConn) Write(b []byte) (int, error) {
+       return len(b), nil
+}
+
+func NewMockConn() *MockConn {
+       c := &MockConn{}
+       return c
+}
+
 func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
+       return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
 }
 
 func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
@@ -292,9 +306,9 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (client *KeepTestClient) CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) {
        if filename == "/file1_in_main.txt" {
-               rdr := ioutil.NopCloser(&bytes.Buffer{})
+               rdr := ioutil.NopCloser(strings.NewReader("foo")) // ioutil.NopCloser(&bytes.Buffer{})
                client.Called = true
-               return FileWrapper{rdr, 1321984}, nil
+               return FileWrapper{rdr, 3}, nil
        }
        return nil, nil
 }
@@ -1142,7 +1156,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
                err := cr.SetupMounts()
                c.Check(err, NotNil)
-               c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin. Only 'collection' is supported.*`)
+               c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
                cr.CleanupDirs()
                checkEmpty()
        }
@@ -1432,3 +1446,39 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
                }
        }
 }
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "json", "content": "foo"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}