X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8fd18fc4a32797fc3c6255099fc253d7aede12b5..f9a1468065776ee71eb43c45e6813bfae69fe0e0:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index f22680fca5..6406c17420 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -50,7 +50,6 @@ var ErrCancelled = errors.New("Cancelled") type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) - CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) } // NewLogWriter is a factory function to create a new log writer. @@ -381,7 +380,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } switch { - case mnt.Kind == "collection": + case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") @@ -687,14 +686,12 @@ func (runner *ContainerRunner) AttachStreams() (err error) { return fmt.Errorf("While getting stding collection: %v", err) } - stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path) + stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path) if os.IsNotExist(err) { return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) } else if err != nil { return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) } - - defer stdinRdr.Close() } else if stdinMnt.Kind == "json" { stdinJson, err = json.Marshal(stdinMnt.Content) if err != nil { @@ -740,30 +737,24 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) if stdinRdr != nil { - copyErrC := make(chan error) go func() { _, err := io.Copy(response.Conn, stdinRdr) - copyErrC <- err - close(copyErrC) + if err != nil { + runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) + runner.stop() + } + stdinRdr.Close() + response.CloseWrite() }() - - copyErr := <-copyErrC - if copyErr != nil { - return fmt.Errorf("While writing stdin collection to docker container %q", copyErr) - } } else if len(stdinJson) != 0 { - copyErrC := make(chan error) go func() { - jsonRdr := bytes.NewReader(stdinJson) - _, err := io.Copy(response.Conn, jsonRdr) - copyErrC <- err - close(copyErrC) + _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) + if err != nil { + runner.CrunchLog.Print("While writing stdin json to docker container %q", err) + runner.stop() + } + response.CloseWrite() }() - - copyErr := <-copyErrC - if copyErr != nil { - return fmt.Errorf("While writing stdin json to docker container %q", copyErr) - } } go runner.ProcessDockerAttach(response.Reader) @@ -811,6 +802,13 @@ func (runner *ContainerRunner) CreateContainer() error { } } + _, stdinUsed := runner.Container.Mounts["stdin"] + runner.ContainerConfig.OpenStdin = stdinUsed + runner.ContainerConfig.StdinOnce = stdinUsed + runner.ContainerConfig.AttachStdin = stdinUsed + runner.ContainerConfig.AttachStdout = true + runner.ContainerConfig.AttachStderr = true + createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) if err != nil { return fmt.Errorf("While creating container: %v", err)