12891: Don't wait for WaitContainer to receive loggingDone.
[arvados.git] / services / crunch-run / crunchrun.go
index a78d47d480ae23deb9953ca96713d827ed57e2cd..7eefb1aaaded35be0b4ab99a03f4d82e30076c06 100644 (file)
@@ -75,7 +75,7 @@ type ThinDockerClient interface {
        ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
                networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
@@ -133,7 +133,6 @@ type ContainerRunner struct {
        setCgroupParent string
 
        cStateLock sync.Mutex
-       cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
 
        enableNetwork string // one of "default" or "always"
@@ -161,15 +160,14 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
-       if !runner.cStarted {
+       if runner.ContainerID == "" {
                return
        }
        runner.cCancelled = true
-       runner.CrunchLog.Printf("stopping container")
-       timeout := 10 * time.Second
-       err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &timeout)
+       runner.CrunchLog.Printf("removing container")
+       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
        if err != nil {
-               runner.CrunchLog.Printf("error stopping container: %s", err)
+               runner.CrunchLog.Printf("error removing container: %s", err)
        }
 }
 
@@ -584,7 +582,6 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
                                }
                        }
 
-                       runner.loggingDone <- true
                        close(runner.loggingDone)
                        return
                }
@@ -921,46 +918,38 @@ func (runner *ContainerRunner) StartContainer() error {
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
        }
-       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
        waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       arvMountExit := runner.ArvMountExit
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+                       code := int(waitBody.StatusCode)
+                       runner.ExitCode = &code
+
+                       // wait for stdout/stderr to complete
+                       <-runner.loggingDone
+                       return nil
 
-       go func() {
-               <-runner.ArvMountExit
-               if runner.cStarted {
+               case err := <-waitErr:
+                       return fmt.Errorf("container wait: %v", err)
+
+               case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
                        runner.stop()
+                       // arvMountExit will always be ready now that
+                       // it's closed, but that doesn't interest us.
+                       arvMountExit = nil
                }
-       }()
-
-       var waitBody dockercontainer.ContainerWaitOKBody
-       select {
-       case waitBody = <-waitOk:
-       case err = <-waitErr:
-       }
-
-       // Container isn't running any more
-       runner.cStarted = false
-
-       if err != nil {
-               return fmt.Errorf("container wait: %v", err)
        }
-
-       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-       code := int(waitBody.StatusCode)
-       runner.ExitCode = &code
-
-       // wait for stdout/stderr to complete
-       <-runner.loggingDone
-
-       return nil
 }
 
 var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
@@ -1139,10 +1128,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-       if runner.finalState != "Complete" {
-               return nil
-       }
-
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1595,7 +1580,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        err = runner.WaitFinish()
-       if err == nil {
+       if err == nil && !runner.IsCancelled() {
                runner.finalState = "Complete"
        }
        return