X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2ebd44960d1e7a0431d6ad612298012627c5019c..9dabca0eedbc9f842d542fea3463a441140d590c:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 7ed5be998b..b95ae1b4d3 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -380,7 +380,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } switch { - case mnt.Kind == "collection": + case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") @@ -631,10 +631,10 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { // Get and save the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() (err error) { w := &ArvLogWriter{ - runner.ArvClient, - runner.Container.UUID, - "container", - runner.LogCollection.Open("container.json"), + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: "container", + writeCloser: runner.LogCollection.Open("container.json"), } // Get Container record JSON from the API Server reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) @@ -692,8 +692,6 @@ func (runner *ContainerRunner) AttachStreams() (err error) { } else if err != nil { return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) } - - defer stdinRdr.Close() } else if stdinMnt.Kind == "json" { stdinJson, err = json.Marshal(stdinMnt.Content) if err != nil { @@ -738,7 +736,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) runner.stop() } - response.Conn.Close() + stdinRdr.Close() + response.CloseWrite() }() } else if len(stdinJson) != 0 { go func() { @@ -747,7 +746,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("While writing stdin json to docker container %q", err) runner.stop() } - response.Conn.Close() + response.CloseWrite() }() } @@ -821,6 +820,13 @@ func (runner *ContainerRunner) CreateContainer() error { } } + _, stdinUsed := runner.Container.Mounts["stdin"] + runner.ContainerConfig.OpenStdin = stdinUsed + runner.ContainerConfig.StdinOnce = stdinUsed + runner.ContainerConfig.AttachStdin = stdinUsed + runner.ContainerConfig.AttachStdout = true + runner.ContainerConfig.AttachStderr = true + createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) if err != nil { return fmt.Errorf("While creating container: %v", err) @@ -1055,8 +1061,8 @@ func (runner *ContainerRunner) CommitLogs() error { // point, but re-open crunch log with ArvClient in case there are any // other further (such as failing to write the log to Keep!) while // shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID, - "crunch-run", nil}) + runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1143,7 +1149,7 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")} + return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, writeCloser: runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle.