12891: Simplify "arv-mount exits before container ends" detection.
[arvados.git] / services / crunch-run / crunchrun.go
index 1bd49569d34d60f3da1aaba3951f7774c497c2cf..72df182574dfeca622b7e7de936d7ccf634b12a2 100644 (file)
@@ -75,7 +75,7 @@ type ThinDockerClient interface {
        ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
                networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
@@ -133,7 +133,6 @@ type ContainerRunner struct {
        setCgroupParent string
 
        cStateLock sync.Mutex
-       cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
 
        enableNetwork string // one of "default" or "always"
@@ -150,11 +149,10 @@ func (runner *ContainerRunner) setupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
        go func(sig chan os.Signal) {
-               s := <-sig
-               if s != nil {
-                       runner.CrunchLog.Printf("Caught signal %v", s)
+               for s := range sig {
+                       runner.CrunchLog.Printf("caught signal: %v", s)
+                       runner.stop()
                }
-               runner.stop()
        }(runner.SigChan)
 }
 
@@ -162,25 +160,20 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
-       if runner.cCancelled {
+       if runner.ContainerID == "" {
                return
        }
        runner.cCancelled = true
-       if runner.cStarted {
-               timeout := time.Duration(10)
-               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
-               if err != nil {
-                       runner.CrunchLog.Printf("StopContainer failed: %s", err)
-               }
-               // Suppress multiple calls to stop()
-               runner.cStarted = false
+       runner.CrunchLog.Printf("removing container")
+       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       if err != nil {
+               runner.CrunchLog.Printf("error removing container: %s", err)
        }
 }
 
 func (runner *ContainerRunner) stopSignals() {
        if runner.SigChan != nil {
                signal.Stop(runner.SigChan)
-               close(runner.SigChan)
        }
 }
 
@@ -926,46 +919,38 @@ func (runner *ContainerRunner) StartContainer() error {
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
        }
-       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       arvMountExit := runner.ArvMountExit
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+                       code := int(waitBody.StatusCode)
+                       runner.ExitCode = &code
+
+                       // wait for stdout/stderr to complete
+                       <-runner.loggingDone
+                       return nil
+
+               case err := <-waitErr:
+                       return fmt.Errorf("container wait: %v", err)
 
-       go func() {
-               <-runner.ArvMountExit
-               if runner.cStarted {
+               case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
                        runner.stop()
+                       // arvMountExit will always be ready now that
+                       // it's closed, but that doesn't interest us.
+                       arvMountExit = nil
                }
-       }()
-
-       var waitBody dockercontainer.ContainerWaitOKBody
-       select {
-       case waitBody = <-waitOk:
-       case err = <-waitErr:
        }
-
-       // Container isn't running any more
-       runner.cStarted = false
-
-       if err != nil {
-               return fmt.Errorf("container wait: %v", err)
-       }
-
-       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-       code := int(waitBody.StatusCode)
-       runner.ExitCode = &code
-
-       // wait for stdout/stderr to complete
-       <-runner.loggingDone
-
-       return nil
 }
 
 var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
@@ -1144,10 +1129,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-       if runner.finalState != "Complete" {
-               return nil
-       }
-
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1600,7 +1581,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        err = runner.WaitFinish()
-       if err == nil {
+       if err == nil && !runner.IsCancelled() {
                runner.finalState = "Complete"
        }
        return