8465: do not wait in the go routine while doing io.Copy of stdin and also invoke...
[arvados.git] / services / crunch-run / crunchrun.go
index f22680fca568c92fe9e765185120789d4a37913d..c8f77f49170649f9ecefaabe0ab8ef19de2857b0 100644 (file)
@@ -50,7 +50,6 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
-       CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -687,7 +686,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                                return fmt.Errorf("While getting stding collection: %v", err)
                        }
 
-                       stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
                        if os.IsNotExist(err) {
                                return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
                        } else if err != nil {
@@ -740,30 +739,23 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
        if stdinRdr != nil {
-               copyErrC := make(chan error)
                go func() {
                        _, err := io.Copy(response.Conn, stdinRdr)
-                       copyErrC <- err
-                       close(copyErrC)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.Conn.Close()
                }()
-
-               copyErr := <-copyErrC
-               if copyErr != nil {
-                       return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
-               }
        } else if len(stdinJson) != 0 {
-               copyErrC := make(chan error)
                go func() {
-                       jsonRdr := bytes.NewReader(stdinJson)
-                       _, err := io.Copy(response.Conn, jsonRdr)
-                       copyErrC <- err
-                       close(copyErrC)
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.Conn.Close()
                }()
-
-               copyErr := <-copyErrC
-               if copyErr != nil {
-                       return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
-               }
        }
 
        go runner.ProcessDockerAttach(response.Reader)