X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3ae1e2f54d4199a9521bf3d4d515bcbb0711989..3ceb42c882b32fb9e4ef79679576254f98cdfb3d:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index f22680fca5..c8f77f4917 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -50,7 +50,6 @@ var ErrCancelled = errors.New("Cancelled") type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) - CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) } // NewLogWriter is a factory function to create a new log writer. @@ -687,7 +686,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { return fmt.Errorf("While getting stding collection: %v", err) } - stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path) + stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path) if os.IsNotExist(err) { return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) } else if err != nil { @@ -740,30 +739,23 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) if stdinRdr != nil { - copyErrC := make(chan error) go func() { _, err := io.Copy(response.Conn, stdinRdr) - copyErrC <- err - close(copyErrC) + if err != nil { + runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) + runner.stop() + } + response.Conn.Close() }() - - copyErr := <-copyErrC - if copyErr != nil { - return fmt.Errorf("While writing stdin collection to docker container %q", copyErr) - } } else if len(stdinJson) != 0 { - copyErrC := make(chan error) go func() { - jsonRdr := bytes.NewReader(stdinJson) - _, err := io.Copy(response.Conn, jsonRdr) - copyErrC <- err - close(copyErrC) + _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) + if err != nil { + runner.CrunchLog.Print("While writing stdin json to docker container %q", err) + runner.stop() + } + response.Conn.Close() }() - - copyErr := <-copyErrC - if copyErr != nil { - return fmt.Errorf("While writing stdin json to docker container %q", copyErr) - } } go runner.ProcessDockerAttach(response.Reader)