dockerclient "github.com/docker/docker/client"
)
+var version = "dev"
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
}
}
-func (runner *ContainerRunner) teardown() {
+func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
close(runner.SigChan)
}
}
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+ for _, d := range errorBlacklist {
+ if strings.Index(goterr.Error(), d) != -1 {
+ runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+ if *brokenNodeHook == "" {
+ runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ } else {
+ runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ // run killme script
+ c := exec.Command(*brokenNodeHook)
+ c.Stdout = runner.CrunchLog
+ c.Stderr = runner.CrunchLog
+ err := c.Run()
+ if err != nil {
+ runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+ }
+ }
+ return true
+ }
+ }
+ return false
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
-
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{
cmd []string
}
-// Gather node information and store it on the log for debugging
+// LogNodeInfo gathers node information and store it on the log for debugging
// purposes.
func (runner *ContainerRunner) LogNodeInfo() (err error) {
w := runner.NewLogWriter("node-info")
return nil
}
-// Get and save the raw JSON container record from the API server
+// LogContainerRecord gets and saves the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
ArvClient: runner.ArvClient,
return extracted.Text, nil
}
-func (runner *ContainerRunner) tryUnmount(umount *exec.Cmd) error {
- umnterr := umount.Start()
- if umnterr != nil {
- runner.CrunchLog.Printf("Error: %v", umnterr)
- }
- go func() {
- mnterr := umount.Wait()
- if mnterr != nil {
- runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
- }
- }()
-
- timeout := time.NewTimer(9 * time.Second)
- select {
- case <-runner.ArvMountExit:
- return nil
- case <-timeout.C:
- return fmt.Errorf("Timed out")
- }
-}
-
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- if err := runner.tryUnmount(exec.Command("fusermount", "-u", runner.ArvMountPoint)); err != nil {
- runner.CrunchLog.Printf("arv-mount not ended, will try force unmount: %v", err)
- err = runner.tryUnmount(exec.Command("arv-mount", "--unmount-timeout=8", "--unmount", runner.ArvMountPoint))
- if err != nil {
- runner.CrunchLog.Printf("Error running arv-mount --unmount: %v", err)
+ var delay int64 = 8
+ umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+ umount.Stdout = runner.CrunchLog
+ umount.Stderr = runner.CrunchLog
+ runner.CrunchLog.Printf("Running %v", umount.Args)
+ umnterr := umount.Start()
+
+ if umnterr != nil {
+ runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+ } else {
+ // If arv-mount --unmount gets stuck for any reason, we
+ // don't want to wait for it forever. Do Wait() in a goroutine
+ // so it doesn't block crunch-run.
+ umountExit := make(chan error)
+ go func() {
+ mnterr := umount.Wait()
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
+ }
+ umountExit <- mnterr
+ }()
+
+ for again := true; again; {
+ again = false
+ select {
+ case <-umountExit:
+ umount = nil
+ again = true
+ case <-runner.ArvMountExit:
+ break
+ case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+ runner.CrunchLog.Printf("Timed out waiting for unmount")
+ if umount != nil {
+ umount.Process.Kill()
+ }
+ runner.ArvMount.Process.Kill()
+ }
}
}
+ }
+
+ if runner.ArvMountPoint != "" {
if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
}
}
for _, tmpdir := range runner.CleanupTempDir {
- if tmpdir == runner.ArvMountPoint {
- continue
- }
if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
}
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
+ runner.CrunchLog.Printf("crunch-run %s started", version)
runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
hostname, hosterr := os.Hostname()
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
- // Clean up temporary directories _after_ finalizing
- // everything (if we've made any by then)
+ runner.finalState = "Queued"
+
defer func() {
+ runner.stopSignals()
+ runner.CleanupDirs()
+
runner.CrunchLog.Printf("crunch-run finished")
+ runner.CrunchLog.Close()
}()
- defer runner.CleanupDirs()
-
- runner.finalState = "Queued"
defer func() {
// checkErr prints e (unless it's nil) and sets err to
checkErr(err)
if runner.finalState == "Queued" {
- runner.CrunchLog.Close()
runner.UpdateContainerFinal()
return
}
checkErr(runner.CaptureOutput())
checkErr(runner.CommitLogs())
checkErr(runner.UpdateContainerFinal())
-
- // The real log is already closed, but then we opened
- // a new one in case we needed to log anything while
- // finalizing.
- runner.CrunchLog.Close()
-
- runner.teardown()
}()
err = runner.fetchContainerRecord()
// check for and/or load image
err = runner.LoadImage()
if err != nil {
- runner.finalState = "Cancelled"
+ if !runner.checkBrokenNode(err) {
+ // Failed to load image but not due to a "broken node"
+ // condition, probably user error.
+ runner.finalState = "Cancelled"
+ }
err = fmt.Errorf("While loading container image: %v", err)
return
}
return
}
- runner.StartCrunchstat()
-
if runner.IsCancelled() {
return
}
}
runner.finalState = "Cancelled"
+ runner.StartCrunchstat()
+
err = runner.StartContainer()
if err != nil {
+ runner.checkBrokenNode(err)
return
}
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("crunch-run %s\n", version)
+ return
+ }
+
+ log.Printf("crunch-run %s started", version)
+
containerId := flag.Arg(0)
if *caCertsPath != "" {
}
api.Retries = 8
- var kc *keepclient.KeepClient
- kc, err = keepclient.MakeKeepClient(api)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ kc, kcerr := keepclient.MakeKeepClient(api)
+ if kcerr != nil {
+ log.Fatalf("%s: %v", containerId, kcerr)
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- var docker *dockerclient.Client
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
- docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
- }
-
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
dockerClientProxy := ThinDockerClientProxy{Docker: docker}
cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ os.Exit(1)
+ }
+
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent