From c3ae1e2f54d4199a9521bf3d4d515bcbb0711989 Mon Sep 17 00:00:00 2001 From: radhika Date: Wed, 5 Apr 2017 19:53:28 -0400 Subject: [PATCH] 8465: added stdin redirection for json mount. --- services/crunch-run/crunchrun.go | 67 ++++++++++++++++++--------- services/crunch-run/crunchrun_test.go | 58 +++++++++++++++++++++-- 2 files changed, 98 insertions(+), 27 deletions(-) diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 52829be29a..f22680fca5 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "encoding/json" "errors" @@ -364,8 +365,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdin" { // Is it a "collection" mount kind? - if mnt.Kind != "collection" { - return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' is supported.", mnt.Kind) + if mnt.Kind != "collection" && mnt.Kind != "json" { + return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind) } } @@ -672,29 +673,37 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("Attaching container streams") // If stdin mount is provided, attach it to the docker container - var stdinUsed bool var stdinRdr keepclient.Reader + var stdinJson []byte if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { - var stdinColl arvados.Collection - collId := stdinMnt.UUID - if collId == "" { - collId = stdinMnt.PortableDataHash - } - err = runner.ArvClient.Get("collections", collId, nil, &stdinColl) - if err != nil { - return fmt.Errorf("While getting stding collection: %v", err) - } + if stdinMnt.Kind == "collection" { + var stdinColl arvados.Collection + collId := stdinMnt.UUID + if collId == "" { + collId = stdinMnt.PortableDataHash + } + err = runner.ArvClient.Get("collections", collId, nil, &stdinColl) + if err != nil { + return fmt.Errorf("While getting stding collection: %v", err) + } - stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path) - if os.IsNotExist(err) { - return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) - } else if err != nil { - return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) + stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path) + if os.IsNotExist(err) { + return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) + } else if err != nil { + return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) + } + + defer stdinRdr.Close() + } else if stdinMnt.Kind == "json" { + stdinJson, err = json.Marshal(stdinMnt.Content) + if err != nil { + return fmt.Errorf("While encoding stdin json data: %v", err) + } } - stdinUsed = true - defer stdinRdr.Close() } + stdinUsed := stdinRdr != nil || len(stdinJson) != 0 response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) if err != nil { @@ -730,18 +739,30 @@ func (runner *ContainerRunner) AttachStreams() (err error) { } runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) - if stdinUsed { + if stdinRdr != nil { + copyErrC := make(chan error) + go func() { + _, err := io.Copy(response.Conn, stdinRdr) + copyErrC <- err + close(copyErrC) + }() + + copyErr := <-copyErrC + if copyErr != nil { + return fmt.Errorf("While writing stdin collection to docker container %q", copyErr) + } + } else if len(stdinJson) != 0 { copyErrC := make(chan error) go func() { - n, err := io.Copy(response.Conn, stdinRdr) - runner.CrunchLog.Printf("BYTES READ = %v", n) + jsonRdr := bytes.NewReader(stdinJson) + _, err := io.Copy(response.Conn, jsonRdr) copyErrC <- err close(copyErrC) }() copyErr := <-copyErrC if copyErr != nil { - return fmt.Errorf("While writing stdin to docker container %q", copyErr) + return fmt.Errorf("While writing stdin json to docker container %q", copyErr) } } diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go index a65d366054..8852bffe26 100644 --- a/services/crunch-run/crunchrun_test.go +++ b/services/crunch-run/crunchrun_test.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "os" "os/exec" "path/filepath" @@ -94,8 +95,21 @@ func NewTestDockerClient(exitCode int) *TestDockerClient { return t } +type MockConn struct { + net.Conn +} + +func (m *MockConn) Write(b []byte) (int, error) { + return len(b), nil +} + +func NewMockConn() *MockConn { + c := &MockConn{} + return c +} + func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) { - return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil + return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil } func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) { @@ -292,9 +306,9 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s func (client *KeepTestClient) CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) { if filename == "/file1_in_main.txt" { - rdr := ioutil.NopCloser(&bytes.Buffer{}) + rdr := ioutil.NopCloser(strings.NewReader("foo")) // ioutil.NopCloser(&bytes.Buffer{}) client.Called = true - return FileWrapper{rdr, 1321984}, nil + return FileWrapper{rdr, 3}, nil } return nil, nil } @@ -1142,7 +1156,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { err := cr.SetupMounts() c.Check(err, NotNil) - c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin. Only 'collection' is supported.*`) + c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`) cr.CleanupDirs() checkEmpty() } @@ -1432,3 +1446,39 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) { } } } + +func (s *TestSuite) TestStdinJsonMountPoint(c *C) { + helperRecord := `{ + "command": ["/bin/sh", "-c", "echo $FROBIZ"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": "/bin", + "environment": {"FROBIZ": "bilbo"}, + "mounts": { + "/tmp": {"kind": "tmp"}, + "stdin": {"kind": "json", "content": "foo"}, + "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} + }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} + }` + + api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.exit_code", 0), NotNil) + c.Check(api.CalledWith("container.state", "Complete"), NotNil) + for _, v := range api.Content { + if v["collection"] != nil { + collection := v["collection"].(arvadosclient.Dict) + if strings.Index(collection["name"].(string), "output") == 0 { + manifest := collection["manifest_text"].(string) + + c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out +`) + } + } + } +} -- 2.30.2