17816: Close stdin/stdout/stderr from main instead of executor.
authorTom Clegg <tom@curii.com>
Fri, 16 Jul 2021 14:24:42 +0000 (10:24 -0400)
committerTom Clegg <tom@curii.com>
Fri, 16 Jul 2021 14:24:42 +0000 (10:24 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/docker.go
lib/crunchrun/executor.go

index 3c9c381619cfb757c25185d812b1c4bdf78f0f56..08e4aa3899ec46ca68d86d5ef1ea930057830c00 100644 (file)
@@ -77,7 +77,10 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       executor containerExecutor
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -106,8 +109,6 @@ type ContainerRunner struct {
        ExitCode      *int
        NewLogWriter  NewLogWriter
        CrunchLog     *ThrottledLogger
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -877,7 +878,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
-       var stdin io.ReadCloser
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
        if mnt, ok := runner.Container.Mounts["stdin"]; ok {
                switch mnt.Kind {
                case "collection":
@@ -954,6 +955,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        if !runner.enableMemoryLimit {
                ram = 0
        }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
        return runner.executor.Create(containerSpec{
                Image:         imageID,
                VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
@@ -1018,6 +1022,27 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+
        if runner.statReporter != nil {
                runner.statReporter.Stop()
                err = runner.statLogger.Close()
@@ -1025,7 +1050,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
-       return nil
+       return returnErr
 }
 
 func (runner *ContainerRunner) updateLogs() {
index 4b1bf8425533e0aaaecdcf3229835edbfbaaef47..22d65334ed3bc2a30228c6ab5c55dbd51e674f6c 100644 (file)
@@ -120,8 +120,6 @@ func (e *stubExecutor) CgroupID() string                { return "cgroupid" }
 func (e *stubExecutor) Stop() error                     { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
 func (e *stubExecutor) Close()                          { e.closed = true }
 func (e *stubExecutor) Wait(context.Context) (int, error) {
-       defer e.created.Stdout.Close()
-       defer e.created.Stderr.Close()
        return <-e.exit, e.waitErr
 }
 
@@ -522,8 +520,6 @@ func dockerLog(fd byte, msg string) []byte {
 func (s *TestSuite) TestRunContainer(c *C) {
        s.executor.runFunc = func() {
                fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
-               s.executor.created.Stdout.Close()
-               s.executor.created.Stderr.Close()
                s.executor.exit <- 0
        }
 
index a39b754b3d396e4c02c75a518d47940dab9212f7..861f8c8c1913f07bab8d7ea722dfa3c643678059 100644 (file)
@@ -186,7 +186,7 @@ func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
        }
 }
 
-func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
+func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
        resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
                Stream: true,
                Stdin:  stdin != nil,
@@ -213,8 +213,7 @@ func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteClo
        return nil
 }
 
-func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error {
-       defer stdin.Close()
+func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
        defer closeConn()
        _, err := io.Copy(conn, stdin)
        if err != nil {
@@ -225,7 +224,7 @@ func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeC
 
 // Handle docker log protocol; see
 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
-func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error {
+func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
        header := make([]byte, 8)
        var err error
        for err == nil {
@@ -247,14 +246,6 @@ func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reade
        if err != nil {
                return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
        }
-       err = stdout.Close()
-       if err != nil {
-               return fmt.Errorf("error writing stdout: close: %v", err)
-       }
-       err = stderr.Close()
-       if err != nil {
-               return fmt.Errorf("error writing stderr: close: %v", err)
-       }
        return nil
 }
 
index c773febe94dda6b1866cc7e5fb26ce2726750328..f4feaa06c21447cc66b2e57a962e2d2c306e6de7 100644 (file)
@@ -25,9 +25,9 @@ type containerSpec struct {
        EnableNetwork bool
        NetworkMode   string // docker network mode, normally "default"
        CgroupParent  string
-       Stdin         io.ReadCloser
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
+       Stdin         io.Reader
+       Stdout        io.Writer
+       Stderr        io.Writer
 }
 
 // containerExecutor is an interface to a container runtime