8465: test cleanup
[arvados.git] / services / crunch-run / crunchrun.go
index f22680fca568c92fe9e765185120789d4a37913d..6406c1742001b7bdde263ba31342ce2574c18029 100644 (file)
@@ -50,7 +50,6 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
-       CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -381,7 +380,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -687,14 +686,12 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                                return fmt.Errorf("While getting stding collection: %v", err)
                        }
 
-                       stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
                        if os.IsNotExist(err) {
                                return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
                        } else if err != nil {
                                return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
                        }
-
-                       defer stdinRdr.Close()
                } else if stdinMnt.Kind == "json" {
                        stdinJson, err = json.Marshal(stdinMnt.Content)
                        if err != nil {
@@ -740,30 +737,24 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
        if stdinRdr != nil {
-               copyErrC := make(chan error)
                go func() {
                        _, err := io.Copy(response.Conn, stdinRdr)
-                       copyErrC <- err
-                       close(copyErrC)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
                }()
-
-               copyErr := <-copyErrC
-               if copyErr != nil {
-                       return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
-               }
        } else if len(stdinJson) != 0 {
-               copyErrC := make(chan error)
                go func() {
-                       jsonRdr := bytes.NewReader(stdinJson)
-                       _, err := io.Copy(response.Conn, jsonRdr)
-                       copyErrC <- err
-                       close(copyErrC)
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
                }()
-
-               copyErr := <-copyErrC
-               if copyErr != nil {
-                       return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
-               }
        }
 
        go runner.ProcessDockerAttach(response.Reader)
@@ -811,6 +802,13 @@ func (runner *ContainerRunner) CreateContainer() error {
                }
        }
 
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
        createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)