9272: Fix up state transitions:
[arvados.git] / services / crunch-run / crunchrun.go
index 6b5baf555c4fd8edbeec57f4496dc949dc2736bd..c50433fe707834157025525a701adc84fde4b1b2 100644 (file)
@@ -103,6 +103,7 @@ type ContainerRunner struct {
        Kc        IKeepClient
        ContainerRecord
        dockerclient.ContainerConfig
+       dockerclient.HostConfig
        token       string
        ContainerID string
        ExitCode    *int
@@ -455,17 +456,10 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        return nil
 }
 
-// StartContainer creates the container and runs it.
-func (runner *ContainerRunner) StartContainer() (err error) {
+// CreateContainer creates the docker container.
+func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
 
-       runner.CancelLock.Lock()
-       defer runner.CancelLock.Unlock()
-
-       if runner.Cancelled {
-               return ErrCancelled
-       }
-
        runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
        if runner.ContainerRecord.Cwd != "." {
                runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
@@ -487,24 +481,26 @@ func (runner *ContainerRunner) StartContainer() (err error) {
        }
 
        runner.ContainerConfig.NetworkDisabled = true
+
+       var err error
        runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
-       hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
+
+       runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
                LogConfig: dockerclient.LogConfig{Type: "none"}}
 
-       err = runner.AttachStreams()
-       if err != nil {
-               return err
-       }
+       return runner.AttachStreams()
+}
 
+// StartContainer starts the docker container created by CreateContainer.
+func (runner *ContainerRunner) StartContainer() error {
        runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
-       err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
+       err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
        if err != nil {
-               return fmt.Errorf("While starting container: %v", err)
+               return fmt.Errorf("could not start container: %v", err)
        }
-
        return nil
 }
 
@@ -618,6 +614,15 @@ func (runner *ContainerRunner) CommitLogs() error {
        runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
                "crunch-run", nil})
 
+       if runner.LogsPDH != nil {
+               // If we have already assigned something to LogsPDH,
+               // we must be closing the re-opened log, which won't
+               // end up getting attached to the container record and
+               // therefore doesn't need to be saved as a collection
+               // -- it exists only to send logs to other channels.
+               return nil
+       }
+
        mt, err := runner.LogCollection.ManifestText()
        if err != nil {
                return fmt.Errorf("While creating log manifest: %v", err)
@@ -634,14 +639,18 @@ func (runner *ContainerRunner) CommitLogs() error {
                return fmt.Errorf("While creating log collection: %v", err)
        }
 
-       runner.LogsPDH = new(string)
-       *runner.LogsPDH = response.PortableDataHash
+       runner.LogsPDH = &response.PortableDataHash
 
        return nil
 }
 
 // UpdateContainerRecordRunning updates the container state to "Running"
 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
+       runner.CancelLock.Lock()
+       defer runner.CancelLock.Unlock()
+       if runner.Cancelled {
+               return ErrCancelled
+       }
        return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
                arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
 }
@@ -664,23 +673,30 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
 
 // UpdateContainerRecordComplete updates the container record state on API
 // server to "Complete" or "Cancelled"
-func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
+func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
        update := arvadosclient.Dict{}
-       if runner.LogsPDH != nil {
-               update["log"] = *runner.LogsPDH
-       }
-       if runner.ExitCode != nil {
-               update["exit_code"] = *runner.ExitCode
-       }
-       if runner.OutputPDH != nil {
-               update["output"] = runner.OutputPDH
-       }
-
        update["state"] = runner.finalState
-
+       if runner.finalState == "Complete" {
+               if runner.LogsPDH != nil {
+                       update["log"] = *runner.LogsPDH
+               }
+               if runner.ExitCode != nil {
+                       update["exit_code"] = *runner.ExitCode
+               }
+               if runner.OutputPDH != nil {
+                       update["output"] = *runner.OutputPDH
+               }
+       }
        return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
+// IsCancelled returns the value of Cancelled, with goroutine safety.
+func (runner *ContainerRunner) IsCancelled() bool {
+       runner.CancelLock.Lock()
+       defer runner.CancelLock.Unlock()
+       return runner.Cancelled
+}
+
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
        return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
@@ -697,93 +713,96 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Printf("Executing on host '%s'", hostname)
        }
 
-       var runerr, waiterr error
+       // Clean up temporary directories _after_ finalizing
+       // everything (if we've made any by then)
+       defer runner.CleanupDirs()
+
+       runner.finalState = "Queued"
 
        defer func() {
-               if err != nil {
-                       runner.CrunchLog.Print(err)
+               // checkErr prints e (unless it's nil) and sets err to
+               // e (unless err is already non-nil). Thus, if err
+               // hasn't already been assigned when Run() returns,
+               // this cleanup func will cause Run() to return the
+               // first non-nil error that is passed to checkErr().
+               checkErr := func(e error) {
+                       if e == nil {
+                               return
+                       }
+                       runner.CrunchLog.Print(e)
+                       if err == nil {
+                               err = e
+                       }
                }
 
-               if runner.Cancelled {
-                       runner.finalState = "Cancelled"
-               } else {
-                       runner.finalState = "Complete"
-               }
+               // Log the error encountered in Run(), if any
+               checkErr(err)
 
-               // (6) capture output
-               outputerr := runner.CaptureOutput()
-               if outputerr != nil {
-                       runner.CrunchLog.Print(outputerr)
+               if runner.finalState == "Queued" {
+                       runner.UpdateContainerRecordFinal()
+                       return
                }
 
-               // (7) clean up temporary directories
-               runner.CleanupDirs()
-
-               // (8) write logs
-               logerr := runner.CommitLogs()
-               if logerr != nil {
-                       runner.CrunchLog.Print(logerr)
+               if runner.IsCancelled() {
+                       runner.finalState = "Cancelled"
+                       // but don't return yet -- we still want to
+                       // capture partial output and write logs
                }
 
-               // (9) update container record with results
-               updateerr := runner.UpdateContainerRecordComplete()
-               if updateerr != nil {
-                       runner.CrunchLog.Print(updateerr)
-               }
+               checkErr(runner.CaptureOutput())
+               checkErr(runner.CommitLogs())
+               checkErr(runner.UpdateContainerRecordFinal())
 
+               // The real log is already closed, but then we opened
+               // a new one in case we needed to log anything while
+               // finalizing.
                runner.CrunchLog.Close()
-
-               if err == nil {
-                       if runerr != nil {
-                               err = runerr
-                       } else if waiterr != nil {
-                               err = waiterr
-                       } else if logerr != nil {
-                               err = logerr
-                       } else if updateerr != nil {
-                               err = updateerr
-                       }
-               }
        }()
 
        err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
        if err != nil {
-               return fmt.Errorf("While getting container record: %v", err)
+               err = fmt.Errorf("While getting container record: %v", err)
+               return
        }
 
-       // (1) setup signal handling
+       // setup signal handling
        runner.SetupSignals()
 
-       // (2) check for and/or load image
+       // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               return fmt.Errorf("While loading container image: %v", err)
+               err = fmt.Errorf("While loading container image: %v", err)
+               return
        }
 
-       // (3) set up FUSE mount and binds
+       // set up FUSE mount and binds
        err = runner.SetupMounts()
        if err != nil {
-               return fmt.Errorf("While setting up mounts: %v", err)
+               err = fmt.Errorf("While setting up mounts: %v", err)
+               return
        }
 
-       // (3) create and start container
-       err = runner.StartContainer()
+       err = runner.CreateContainer()
        if err != nil {
-               if err == ErrCancelled {
-                       err = nil
-               }
                return
        }
 
-       // (4) update container record state
+       if runner.IsCancelled() {
+               return
+       }
+
        err = runner.UpdateContainerRecordRunning()
        if err != nil {
-               runner.CrunchLog.Print(err)
+               return
        }
+       runner.finalState = "Complete"
 
-       // (5) wait for container to finish
-       waiterr = runner.WaitFinish()
+       err = runner.StartContainer()
+       if err != nil {
+               return
+       }
 
+       err = runner.WaitFinish()
        return
 }