X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3c5de241f6a6ac56e8bf986c89ffe153b9d941fe..15eda5715c312d12ec24c24db80448bee90e38ea:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 55edb99823..fc0dda718c 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -183,7 +183,6 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // passed through to HostConfig.NetworkMode arvMountLog *ThrottledLogger - arvMountKill func() } // setupSignals sets up signal handling to gracefully terminate the underlying @@ -222,13 +221,39 @@ func (runner *ContainerRunner) stop() { } } -func (runner *ContainerRunner) teardown() { +func (runner *ContainerRunner) stopSignals() { if runner.SigChan != nil { signal.Stop(runner.SigChan) close(runner.SigChan) } } +var errorBlacklist = []string{"Cannot connect to the Docker daemon"} +var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") + +func (runner *ContainerRunner) checkBrokenNode(goterr error) bool { + for _, d := range errorBlacklist { + if strings.Index(goterr.Error(), d) != -1 { + runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr) + if *brokenNodeHook == "" { + runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.") + } else { + runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) + // run killme script + c := exec.Command(*brokenNodeHook) + c.Stdout = runner.CrunchLog + c.Stderr = runner.CrunchLog + err := c.Run() + if err != nil { + runner.CrunchLog.Printf("Error running broken node hook: %v", err) + } + } + return true + } + } + return false +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -309,10 +334,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( return nil, err } - runner.arvMountKill = func() { - c.Process.Kill() - } - statReadme := make(chan bool) runner.ArvMountExit = make(chan error) @@ -363,8 +384,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return fmt.Errorf("While creating keep mount temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint) - pdhOnly := true tmpcount := 0 arvMountCmd := []string{ @@ -1260,36 +1279,55 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b func (runner *ContainerRunner) CleanupDirs() { if runner.ArvMount != nil { - var umount *exec.Cmd - umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint) - done := false - try := 1 - for !done { - umnterr := umount.Run() - if umnterr != nil { - runner.CrunchLog.Printf("Error: %v", umnterr) - } - timeout := time.NewTimer(10 * time.Second) - select { - case <-runner.ArvMountExit: - done = true - case <-timeout.C: - if try == 1 { - runner.CrunchLog.Printf("Timeout waiting for arv-mount to end. Will force unmount.") - umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint) - try = 2 - } else { - runner.CrunchLog.Printf("Killing arv-mount") - runner.arvMountKill() - umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint) + var delay int64 = 8 + umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint) + umount.Stdout = runner.CrunchLog + umount.Stderr = runner.CrunchLog + runner.CrunchLog.Printf("Running %v", umount.Args) + umnterr := umount.Start() + + if umnterr != nil { + runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + } else { + // If arv-mount --unmount gets stuck for any reason, we + // don't want to wait for it forever. Do Wait() in a goroutine + // so it doesn't block crunch-run. + umountExit := make(chan error) + go func() { + mnterr := umount.Wait() + if mnterr != nil { + runner.CrunchLog.Printf("Error unmounting: %v", mnterr) + } + umountExit <- mnterr + }() + + for again := true; again; { + again = false + select { + case <-umountExit: + umount = nil + again = true + case <-runner.ArvMountExit: + break + case <-time.After(time.Duration((delay + 1) * int64(time.Second))): + runner.CrunchLog.Printf("Timed out waiting for unmount") + if umount != nil { + umount.Process.Kill() + } + runner.ArvMount.Process.Kill() } } } } + if runner.ArvMountPoint != "" { + if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { + runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) + } + } + for _, tmpdir := range runner.CleanupTempDir { - rmerr := os.RemoveAll(tmpdir) - if rmerr != nil { + if rmerr := os.RemoveAll(tmpdir); rmerr != nil { runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr) } } @@ -1299,7 +1337,9 @@ func (runner *ContainerRunner) CleanupDirs() { func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Print(runner.finalState) - runner.arvMountLog.Close() + if runner.arvMountLog != nil { + runner.arvMountLog.Close() + } runner.CrunchLog.Close() // Closing CrunchLog above allows them to be committed to Keep at this @@ -1413,14 +1453,15 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } - // Clean up temporary directories _after_ finalizing - // everything (if we've made any by then) + runner.finalState = "Queued" + defer func() { + runner.stopSignals() + runner.CleanupDirs() + runner.CrunchLog.Printf("crunch-run finished") + runner.CrunchLog.Close() }() - defer runner.CleanupDirs() - - runner.finalState = "Queued" defer func() { // checkErr prints e (unless it's nil) and sets err to @@ -1446,7 +1487,6 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(err) if runner.finalState == "Queued" { - runner.CrunchLog.Close() runner.UpdateContainerFinal() return } @@ -1460,13 +1500,6 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(runner.CaptureOutput()) checkErr(runner.CommitLogs()) checkErr(runner.UpdateContainerFinal()) - - // The real log is already closed, but then we opened - // a new one in case we needed to log anything while - // finalizing. - runner.CrunchLog.Close() - - runner.teardown() }() err = runner.fetchContainerRecord() @@ -1480,7 +1513,11 @@ func (runner *ContainerRunner) Run() (err error) { // check for and/or load image err = runner.LoadImage() if err != nil { - runner.finalState = "Cancelled" + if !runner.checkBrokenNode(err) { + // Failed to load image but not due to a "broken node" + // condition, probably user error. + runner.finalState = "Cancelled" + } err = fmt.Errorf("While loading container image: %v", err) return } @@ -1509,8 +1546,6 @@ func (runner *ContainerRunner) Run() (err error) { return } - runner.StartCrunchstat() - if runner.IsCancelled() { return } @@ -1521,8 +1556,11 @@ func (runner *ContainerRunner) Run() (err error) { } runner.finalState = "Cancelled" + runner.StartCrunchstat() + err = runner.StartContainer() if err != nil { + runner.checkBrokenNode(err) return } @@ -1600,25 +1638,27 @@ func main() { } api.Retries = 8 - var kc *keepclient.KeepClient - kc, err = keepclient.MakeKeepClient(api) - if err != nil { - log.Fatalf("%s: %v", containerId, err) + kc, kcerr := keepclient.MakeKeepClient(api) + if kcerr != nil { + log.Fatalf("%s: %v", containerId, kcerr) } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - var docker *dockerclient.Client // API version 1.21 corresponds to Docker 1.9, which is currently the // minimum version we want to support. - docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - if err != nil { - log.Fatalf("%s: %v", containerId, err) - } - + docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) dockerClientProxy := ThinDockerClientProxy{Docker: docker} cr := NewContainerRunner(api, kc, dockerClientProxy, containerId) + + if dockererr != nil { + cr.CrunchLog.Printf("%s: %v", containerId, dockererr) + cr.checkBrokenNode(dockererr) + cr.CrunchLog.Close() + os.Exit(1) + } + cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent