Kc IKeepClient
ContainerRecord
dockerclient.ContainerConfig
+ dockerclient.HostConfig
token string
ContainerID string
ExitCode *int
return nil
}
-// StartContainer creates the container and runs it.
-func (runner *ContainerRunner) StartContainer() (err error) {
+// CreateContainer creates the docker container.
+func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
- runner.CancelLock.Lock()
- defer runner.CancelLock.Unlock()
-
- if runner.Cancelled {
- return ErrCancelled
- }
-
runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
if runner.ContainerRecord.Cwd != "." {
runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
}
runner.ContainerConfig.NetworkDisabled = true
+
+ var err error
runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
- hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
+
+ runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
LogConfig: dockerclient.LogConfig{Type: "none"}}
- err = runner.AttachStreams()
- if err != nil {
- return err
- }
+ return runner.AttachStreams()
+}
+// StartContainer starts the docker container created by CreateContainer.
+func (runner *ContainerRunner) StartContainer() error {
runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
- err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
+ err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
if err != nil {
- return fmt.Errorf("While starting container: %v", err)
+ return fmt.Errorf("could not start container: %v", err)
}
-
return nil
}
runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
"crunch-run", nil})
+ if runner.LogsPDH != nil {
+ // If we have already assigned something to LogsPDH,
+ // we must be closing the re-opened log, which won't
+ // end up getting attached to the container record and
+ // therefore doesn't need to be saved as a collection
+ // -- it exists only to send logs to other channels.
+ return nil
+ }
+
mt, err := runner.LogCollection.ManifestText()
if err != nil {
return fmt.Errorf("While creating log manifest: %v", err)
return fmt.Errorf("While creating log collection: %v", err)
}
- runner.LogsPDH = new(string)
- *runner.LogsPDH = response.PortableDataHash
+ runner.LogsPDH = &response.PortableDataHash
return nil
}
// UpdateContainerRecordRunning updates the container state to "Running"
func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
+ runner.CancelLock.Lock()
+ defer runner.CancelLock.Unlock()
+ if runner.Cancelled {
+ return ErrCancelled
+ }
return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
}
// UpdateContainerRecordComplete updates the container record state on API
// server to "Complete" or "Cancelled"
-func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
+func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
update := arvadosclient.Dict{}
- if runner.LogsPDH != nil {
- update["log"] = *runner.LogsPDH
- }
- if runner.ExitCode != nil {
- update["exit_code"] = *runner.ExitCode
- }
- if runner.OutputPDH != nil {
- update["output"] = runner.OutputPDH
- }
-
update["state"] = runner.finalState
-
+ if runner.finalState == "Complete" {
+ if runner.LogsPDH != nil {
+ update["log"] = *runner.LogsPDH
+ }
+ if runner.ExitCode != nil {
+ update["exit_code"] = *runner.ExitCode
+ }
+ if runner.OutputPDH != nil {
+ update["output"] = *runner.OutputPDH
+ }
+ }
return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
}
+// IsCancelled returns the value of Cancelled, with goroutine safety.
+func (runner *ContainerRunner) IsCancelled() bool {
+ runner.CancelLock.Lock()
+ defer runner.CancelLock.Unlock()
+ return runner.Cancelled
+}
+
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
- var runerr, waiterr error
+ // Clean up temporary directories _after_ finalizing
+ // everything (if we've made any by then)
+ defer runner.CleanupDirs()
+
+ runner.finalState = "Queued"
defer func() {
- if err != nil {
- runner.CrunchLog.Print(err)
+ // checkErr prints e (unless it's nil) and sets err to
+ // e (unless err is already non-nil). Thus, if err
+ // hasn't already been assigned when Run() returns,
+ // this cleanup func will cause Run() to return the
+ // first non-nil error that is passed to checkErr().
+ checkErr := func(e error) {
+ if e == nil {
+ return
+ }
+ runner.CrunchLog.Print(e)
+ if err == nil {
+ err = e
+ }
}
- if runner.Cancelled {
- runner.finalState = "Cancelled"
- } else {
- runner.finalState = "Complete"
- }
+ // Log the error encountered in Run(), if any
+ checkErr(err)
- // (6) capture output
- outputerr := runner.CaptureOutput()
- if outputerr != nil {
- runner.CrunchLog.Print(outputerr)
+ if runner.finalState == "Queued" {
+ runner.UpdateContainerRecordFinal()
+ return
}
- // (7) clean up temporary directories
- runner.CleanupDirs()
-
- // (8) write logs
- logerr := runner.CommitLogs()
- if logerr != nil {
- runner.CrunchLog.Print(logerr)
+ if runner.IsCancelled() {
+ runner.finalState = "Cancelled"
+ // but don't return yet -- we still want to
+ // capture partial output and write logs
}
- // (9) update container record with results
- updateerr := runner.UpdateContainerRecordComplete()
- if updateerr != nil {
- runner.CrunchLog.Print(updateerr)
- }
+ checkErr(runner.CaptureOutput())
+ checkErr(runner.CommitLogs())
+ checkErr(runner.UpdateContainerRecordFinal())
+ // The real log is already closed, but then we opened
+ // a new one in case we needed to log anything while
+ // finalizing.
runner.CrunchLog.Close()
-
- if err == nil {
- if runerr != nil {
- err = runerr
- } else if waiterr != nil {
- err = waiterr
- } else if logerr != nil {
- err = logerr
- } else if updateerr != nil {
- err = updateerr
- }
- }
}()
err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
if err != nil {
- return fmt.Errorf("While getting container record: %v", err)
+ err = fmt.Errorf("While getting container record: %v", err)
+ return
}
- // (1) setup signal handling
+ // setup signal handling
runner.SetupSignals()
- // (2) check for and/or load image
+ // check for and/or load image
err = runner.LoadImage()
if err != nil {
- return fmt.Errorf("While loading container image: %v", err)
+ err = fmt.Errorf("While loading container image: %v", err)
+ return
}
- // (3) set up FUSE mount and binds
+ // set up FUSE mount and binds
err = runner.SetupMounts()
if err != nil {
- return fmt.Errorf("While setting up mounts: %v", err)
+ err = fmt.Errorf("While setting up mounts: %v", err)
+ return
}
- // (3) create and start container
- err = runner.StartContainer()
+ err = runner.CreateContainer()
if err != nil {
- if err == ErrCancelled {
- err = nil
- }
return
}
- // (4) update container record state
+ if runner.IsCancelled() {
+ return
+ }
+
err = runner.UpdateContainerRecordRunning()
if err != nil {
- runner.CrunchLog.Print(err)
+ return
}
+ runner.finalState = "Complete"
- // (5) wait for container to finish
- waiterr = runner.WaitFinish()
+ err = runner.StartContainer()
+ if err != nil {
+ return
+ }
+ err = runner.WaitFinish()
return
}