X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/89f541d2b534b9cad4ee668f702f1270bf056171..879bde382ebf26aa593869cfff22cc7e85be6bb0:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 3c9c381619..08e4aa3899 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -77,7 +77,10 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - executor containerExecutor + executor containerExecutor + executorStdin io.Closer + executorStdout io.Closer + executorStderr io.Closer // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -106,8 +109,6 @@ type ContainerRunner struct { ExitCode *int NewLogWriter NewLogWriter CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -877,7 +878,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { - var stdin io.ReadCloser + var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) if mnt, ok := runner.Container.Mounts["stdin"]; ok { switch mnt.Kind { case "collection": @@ -954,6 +955,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st if !runner.enableMemoryLimit { ram = 0 } + runner.executorStdin = stdin + runner.executorStdout = stdout + runner.executorStderr = stderr return runner.executor.Create(containerSpec{ Image: imageID, VCPUs: runner.Container.RuntimeConstraints.VCPUs, @@ -1018,6 +1022,27 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.ExitCode = &exitcode + var returnErr error + if err = runner.executorStdin.Close(); err != nil { + err = fmt.Errorf("error closing container stdin: %s", err) + runner.CrunchLog.Printf("%s", err) + returnErr = err + } + if err = runner.executorStdout.Close(); err != nil { + err = fmt.Errorf("error closing container stdout: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if err = runner.executorStderr.Close(); err != nil { + err = fmt.Errorf("error closing container stderr: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if runner.statReporter != nil { runner.statReporter.Stop() err = runner.statLogger.Close() @@ -1025,7 +1050,7 @@ func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } - return nil + return returnErr } func (runner *ContainerRunner) updateLogs() {