From dbf2c829a79ef890c839a2f1498b455340e3ab51 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 26 May 2016 21:17:31 -0400 Subject: [PATCH] 9272: Fix up state transitions: * Change state to Running only at the last possible moment before starting the container. * When erroring out before Running, change state back to Queued. * Do not save log/output/exit code when changing state to Cancelled. Incidental fixes: * Clean up error handling in Run() * Don't create a collection for (or try to attach to the container) the second "cleanup activities" log that gets opened after closing the real container log. --- services/crunch-run/crunchrun.go | 183 ++++++++++++++------------ services/crunch-run/crunchrun_test.go | 14 +- 2 files changed, 108 insertions(+), 89 deletions(-) diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 6b5baf555c..c50433fe70 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -103,6 +103,7 @@ type ContainerRunner struct { Kc IKeepClient ContainerRecord dockerclient.ContainerConfig + dockerclient.HostConfig token string ContainerID string ExitCode *int @@ -455,17 +456,10 @@ func (runner *ContainerRunner) AttachStreams() (err error) { return nil } -// StartContainer creates the container and runs it. -func (runner *ContainerRunner) StartContainer() (err error) { +// CreateContainer creates the docker container. +func (runner *ContainerRunner) CreateContainer() error { runner.CrunchLog.Print("Creating Docker container") - runner.CancelLock.Lock() - defer runner.CancelLock.Unlock() - - if runner.Cancelled { - return ErrCancelled - } - runner.ContainerConfig.Cmd = runner.ContainerRecord.Command if runner.ContainerRecord.Cwd != "." { runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd @@ -487,24 +481,26 @@ func (runner *ContainerRunner) StartContainer() (err error) { } runner.ContainerConfig.NetworkDisabled = true + + var err error runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil) if err != nil { return fmt.Errorf("While creating container: %v", err) } - hostConfig := &dockerclient.HostConfig{Binds: runner.Binds, + + runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds, LogConfig: dockerclient.LogConfig{Type: "none"}} - err = runner.AttachStreams() - if err != nil { - return err - } + return runner.AttachStreams() +} +// StartContainer starts the docker container created by CreateContainer. +func (runner *ContainerRunner) StartContainer() error { runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) - err = runner.Docker.StartContainer(runner.ContainerID, hostConfig) + err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig) if err != nil { - return fmt.Errorf("While starting container: %v", err) + return fmt.Errorf("could not start container: %v", err) } - return nil } @@ -618,6 +614,15 @@ func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, "crunch-run", nil}) + if runner.LogsPDH != nil { + // If we have already assigned something to LogsPDH, + // we must be closing the re-opened log, which won't + // end up getting attached to the container record and + // therefore doesn't need to be saved as a collection + // -- it exists only to send logs to other channels. + return nil + } + mt, err := runner.LogCollection.ManifestText() if err != nil { return fmt.Errorf("While creating log manifest: %v", err) @@ -634,14 +639,18 @@ func (runner *ContainerRunner) CommitLogs() error { return fmt.Errorf("While creating log collection: %v", err) } - runner.LogsPDH = new(string) - *runner.LogsPDH = response.PortableDataHash + runner.LogsPDH = &response.PortableDataHash return nil } // UpdateContainerRecordRunning updates the container state to "Running" func (runner *ContainerRunner) UpdateContainerRecordRunning() error { + runner.CancelLock.Lock() + defer runner.CancelLock.Unlock() + if runner.Cancelled { + return ErrCancelled + } return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) } @@ -664,23 +673,30 @@ func (runner *ContainerRunner) ContainerToken() (string, error) { // UpdateContainerRecordComplete updates the container record state on API // server to "Complete" or "Cancelled" -func (runner *ContainerRunner) UpdateContainerRecordComplete() error { +func (runner *ContainerRunner) UpdateContainerRecordFinal() error { update := arvadosclient.Dict{} - if runner.LogsPDH != nil { - update["log"] = *runner.LogsPDH - } - if runner.ExitCode != nil { - update["exit_code"] = *runner.ExitCode - } - if runner.OutputPDH != nil { - update["output"] = runner.OutputPDH - } - update["state"] = runner.finalState - + if runner.finalState == "Complete" { + if runner.LogsPDH != nil { + update["log"] = *runner.LogsPDH + } + if runner.ExitCode != nil { + update["exit_code"] = *runner.ExitCode + } + if runner.OutputPDH != nil { + update["output"] = *runner.OutputPDH + } + } return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil) } +// IsCancelled returns the value of Cancelled, with goroutine safety. +func (runner *ContainerRunner) IsCancelled() bool { + runner.CancelLock.Lock() + defer runner.CancelLock.Unlock() + return runner.Cancelled +} + // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")} @@ -697,93 +713,96 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } - var runerr, waiterr error + // Clean up temporary directories _after_ finalizing + // everything (if we've made any by then) + defer runner.CleanupDirs() + + runner.finalState = "Queued" defer func() { - if err != nil { - runner.CrunchLog.Print(err) + // checkErr prints e (unless it's nil) and sets err to + // e (unless err is already non-nil). Thus, if err + // hasn't already been assigned when Run() returns, + // this cleanup func will cause Run() to return the + // first non-nil error that is passed to checkErr(). + checkErr := func(e error) { + if e == nil { + return + } + runner.CrunchLog.Print(e) + if err == nil { + err = e + } } - if runner.Cancelled { - runner.finalState = "Cancelled" - } else { - runner.finalState = "Complete" - } + // Log the error encountered in Run(), if any + checkErr(err) - // (6) capture output - outputerr := runner.CaptureOutput() - if outputerr != nil { - runner.CrunchLog.Print(outputerr) + if runner.finalState == "Queued" { + runner.UpdateContainerRecordFinal() + return } - // (7) clean up temporary directories - runner.CleanupDirs() - - // (8) write logs - logerr := runner.CommitLogs() - if logerr != nil { - runner.CrunchLog.Print(logerr) + if runner.IsCancelled() { + runner.finalState = "Cancelled" + // but don't return yet -- we still want to + // capture partial output and write logs } - // (9) update container record with results - updateerr := runner.UpdateContainerRecordComplete() - if updateerr != nil { - runner.CrunchLog.Print(updateerr) - } + checkErr(runner.CaptureOutput()) + checkErr(runner.CommitLogs()) + checkErr(runner.UpdateContainerRecordFinal()) + // The real log is already closed, but then we opened + // a new one in case we needed to log anything while + // finalizing. runner.CrunchLog.Close() - - if err == nil { - if runerr != nil { - err = runerr - } else if waiterr != nil { - err = waiterr - } else if logerr != nil { - err = logerr - } else if updateerr != nil { - err = updateerr - } - } }() err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord) if err != nil { - return fmt.Errorf("While getting container record: %v", err) + err = fmt.Errorf("While getting container record: %v", err) + return } - // (1) setup signal handling + // setup signal handling runner.SetupSignals() - // (2) check for and/or load image + // check for and/or load image err = runner.LoadImage() if err != nil { - return fmt.Errorf("While loading container image: %v", err) + err = fmt.Errorf("While loading container image: %v", err) + return } - // (3) set up FUSE mount and binds + // set up FUSE mount and binds err = runner.SetupMounts() if err != nil { - return fmt.Errorf("While setting up mounts: %v", err) + err = fmt.Errorf("While setting up mounts: %v", err) + return } - // (3) create and start container - err = runner.StartContainer() + err = runner.CreateContainer() if err != nil { - if err == ErrCancelled { - err = nil - } return } - // (4) update container record state + if runner.IsCancelled() { + return + } + err = runner.UpdateContainerRecordRunning() if err != nil { - runner.CrunchLog.Print(err) + return } + runner.finalState = "Complete" - // (5) wait for container to finish - waiterr = runner.WaitFinish() + err = runner.StartContainer() + if err != nil { + return + } + err = runner.WaitFinish() return } diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go index 9634a0a478..ef48d6b0f4 100644 --- a/services/crunch-run/crunchrun_test.go +++ b/services/crunch-run/crunchrun_test.go @@ -396,6 +396,9 @@ func (s *TestSuite) TestRunContainer(c *C) { err := cr.LoadImage() c.Check(err, IsNil) + err = cr.CreateContainer() + c.Check(err, IsNil) + err = cr.StartContainer() c.Check(err, IsNil) @@ -448,7 +451,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) { *cr.ExitCode = 42 cr.finalState = "Complete" - err := cr.UpdateContainerRecordComplete() + err := cr.UpdateContainerRecordFinal() c.Check(err, IsNil) c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH) @@ -463,7 +466,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) { cr.Cancelled = true cr.finalState = "Cancelled" - err := cr.UpdateContainerRecordComplete() + err := cr.UpdateContainerRecordFinal() c.Check(err, IsNil) c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil) @@ -641,10 +644,6 @@ func (s *TestSuite) TestCancel(c *C) { err = cr.Run() c.Check(err, IsNil) - - c.Check(api.Calls, Equals, 6) - c.Check(api.Content[5]["container"].(arvadosclient.Dict)["log"], NotNil) - if err != nil { for k, v := range api.Logs { c.Log(k) @@ -652,8 +651,9 @@ func (s *TestSuite) TestCancel(c *C) { } } + c.Assert(api.Calls, Equals, 6) + c.Check(api.Content[5]["container"].(arvadosclient.Dict)["log"], IsNil) c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled") - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true) } -- 2.30.2