Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / services / crunch-run / crunchrun.go
index 55edb99823c6ab6adac98740748baf5784a63a5c..fc0dda718ceda7fddd2e1f41c704fa434fdb0034 100644 (file)
@@ -183,7 +183,6 @@ type ContainerRunner struct {
        enableNetwork string // one of "default" or "always"
        networkMode   string // passed through to HostConfig.NetworkMode
        arvMountLog   *ThrottledLogger
-       arvMountKill  func()
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -222,13 +221,39 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
-func (runner *ContainerRunner) teardown() {
+func (runner *ContainerRunner) stopSignals() {
        if runner.SigChan != nil {
                signal.Stop(runner.SigChan)
                close(runner.SigChan)
        }
 }
 
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if strings.Index(goterr.Error(), d) != -1 {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+                               }
+                       }
+                       return true
+               }
+       }
+       return false
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -309,10 +334,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
                return nil, err
        }
 
-       runner.arvMountKill = func() {
-               c.Process.Kill()
-       }
-
        statReadme := make(chan bool)
        runner.ArvMountExit = make(chan error)
 
@@ -363,8 +384,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
 
-       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
-
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{
@@ -1260,36 +1279,55 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
-               var umount *exec.Cmd
-               umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
-               done := false
-               try := 1
-               for !done {
-                       umnterr := umount.Run()
-                       if umnterr != nil {
-                               runner.CrunchLog.Printf("Error: %v", umnterr)
-                       }
-                       timeout := time.NewTimer(10 * time.Second)
-                       select {
-                       case <-runner.ArvMountExit:
-                               done = true
-                       case <-timeout.C:
-                               if try == 1 {
-                                       runner.CrunchLog.Printf("Timeout waiting for arv-mount to end.  Will force unmount.")
-                                       umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
-                                       try = 2
-                               } else {
-                                       runner.CrunchLog.Printf("Killing arv-mount")
-                                       runner.arvMountKill()
-                                       umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
+               var delay int64 = 8
+               umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+               umount.Stdout = runner.CrunchLog
+               umount.Stderr = runner.CrunchLog
+               runner.CrunchLog.Printf("Running %v", umount.Args)
+               umnterr := umount.Start()
+
+               if umnterr != nil {
+                       runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+               } else {
+                       // If arv-mount --unmount gets stuck for any reason, we
+                       // don't want to wait for it forever.  Do Wait() in a goroutine
+                       // so it doesn't block crunch-run.
+                       umountExit := make(chan error)
+                       go func() {
+                               mnterr := umount.Wait()
+                               if mnterr != nil {
+                                       runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
+                               }
+                               umountExit <- mnterr
+                       }()
+
+                       for again := true; again; {
+                               again = false
+                               select {
+                               case <-umountExit:
+                                       umount = nil
+                                       again = true
+                               case <-runner.ArvMountExit:
+                                       break
+                               case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+                                       runner.CrunchLog.Printf("Timed out waiting for unmount")
+                                       if umount != nil {
+                                               umount.Process.Kill()
+                                       }
+                                       runner.ArvMount.Process.Kill()
                                }
                        }
                }
        }
 
+       if runner.ArvMountPoint != "" {
+               if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
+                       runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
+               }
+       }
+
        for _, tmpdir := range runner.CleanupTempDir {
-               rmerr := os.RemoveAll(tmpdir)
-               if rmerr != nil {
+               if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
                        runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
                }
        }
@@ -1299,7 +1337,9 @@ func (runner *ContainerRunner) CleanupDirs() {
 func (runner *ContainerRunner) CommitLogs() error {
        runner.CrunchLog.Print(runner.finalState)
 
-       runner.arvMountLog.Close()
+       if runner.arvMountLog != nil {
+               runner.arvMountLog.Close()
+       }
        runner.CrunchLog.Close()
 
        // Closing CrunchLog above allows them to be committed to Keep at this
@@ -1413,14 +1453,15 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Printf("Executing on host '%s'", hostname)
        }
 
-       // Clean up temporary directories _after_ finalizing
-       // everything (if we've made any by then)
+       runner.finalState = "Queued"
+
        defer func() {
+               runner.stopSignals()
+               runner.CleanupDirs()
+
                runner.CrunchLog.Printf("crunch-run finished")
+               runner.CrunchLog.Close()
        }()
-       defer runner.CleanupDirs()
-
-       runner.finalState = "Queued"
 
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
@@ -1446,7 +1487,6 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr(err)
 
                if runner.finalState == "Queued" {
-                       runner.CrunchLog.Close()
                        runner.UpdateContainerFinal()
                        return
                }
@@ -1460,13 +1500,6 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr(runner.CaptureOutput())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
-
-               // The real log is already closed, but then we opened
-               // a new one in case we needed to log anything while
-               // finalizing.
-               runner.CrunchLog.Close()
-
-               runner.teardown()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1480,7 +1513,11 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1509,8 +1546,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
-
        if runner.IsCancelled() {
                return
        }
@@ -1521,8 +1556,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1600,25 +1638,27 @@ func main() {
        }
        api.Retries = 8
 
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       var docker *dockerclient.Client
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
-       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
-
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        dockerClientProxy := ThinDockerClientProxy{Docker: docker}
 
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
+
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent