12876: Merge branch 'master' into 12876-arvados-client
[arvados.git] / services / crunch-run / crunchrun.go
index ead918473c22af7542b23b233ab0dd064d3ecad2..e5e0ea001669f21219c8f6d01717f7771b885f59 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "regexp"
        "runtime"
        "runtime/pprof"
        "sort"
@@ -39,6 +40,8 @@ import (
        dockerclient "github.com/docker/docker/client"
 )
 
+var version = "dev"
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -228,6 +231,35 @@ func (runner *ContainerRunner) stopSignals() {
        }
 }
 
+var errorBlacklist = []string{
+       "(?ms).*[Cc]annot connect to the Docker daemon.*",
+       "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
+}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+                               }
+                       }
+                       return true
+               }
+       }
+       return false
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -358,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
 
+       token, err := runner.ContainerToken()
+       if err != nil {
+               return fmt.Errorf("could not get container token: %s", err)
+       }
+
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{
@@ -506,6 +543,18 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                return fmt.Errorf("writing temp file: %v", err)
                        }
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+
+               case mnt.Kind == "git_tree":
+                       tmpdir, err := runner.MkTempDir("", "")
+                       if err != nil {
+                               return fmt.Errorf("creating temp dir: %v", err)
+                       }
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
+                       if err != nil {
+                               return err
+                       }
+                       runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
                }
        }
 
@@ -530,11 +579,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
-       token, err := runner.ContainerToken()
-       if err != nil {
-               return fmt.Errorf("could not get container token: %s", err)
-       }
-
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
        if err != nil {
                return fmt.Errorf("While trying to start arv-mount: %v", err)
@@ -616,11 +660,10 @@ type infoCommand struct {
        cmd   []string
 }
 
-// Gather node information and store it on the log for debugging
+// LogNodeInfo gathers node information and store it on the log for debugging
 // purposes.
 func (runner *ContainerRunner) LogNodeInfo() (err error) {
        w := runner.NewLogWriter("node-info")
-       logger := log.New(w, "node-info", 0)
 
        commands := []infoCommand{
                {
@@ -646,17 +689,17 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        }
 
        // Run commands with informational output to be logged.
-       var out []byte
        for _, command := range commands {
-               out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
-               if err != nil {
-                       return fmt.Errorf("While running command %q: %v",
-                               command.cmd, err)
-               }
-               logger.Println(command.label)
-               for _, line := range strings.Split(string(out), "\n") {
-                       logger.Println(" ", line)
+               fmt.Fprintln(w, command.label)
+               cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
+               cmd.Stdout = w
+               cmd.Stderr = w
+               if err := cmd.Run(); err != nil {
+                       err = fmt.Errorf("While running command %q: %v", command.cmd, err)
+                       fmt.Fprintln(w, err)
+                       return err
                }
+               fmt.Fprintln(w, "")
        }
 
        err = w.Close()
@@ -666,7 +709,7 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        return nil
 }
 
-// Get and save the raw JSON container record from the API server
+// LogContainerRecord gets and saves the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
                ArvClient:     runner.ArvClient,
@@ -889,7 +932,7 @@ func (runner *ContainerRunner) StartContainer() error {
                dockertypes.ContainerStartOptions{})
        if err != nil {
                var advice string
-               if strings.Contains(err.Error(), "no such file or directory") {
+               if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
                        advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
@@ -1255,26 +1298,41 @@ func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
                var delay int64 = 8
                umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+               umount.Stdout = runner.CrunchLog
+               umount.Stderr = runner.CrunchLog
+               runner.CrunchLog.Printf("Running %v", umount.Args)
                umnterr := umount.Start()
+
                if umnterr != nil {
-                       runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
+                       runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
                } else {
                        // If arv-mount --unmount gets stuck for any reason, we
                        // don't want to wait for it forever.  Do Wait() in a goroutine
                        // so it doesn't block crunch-run.
+                       umountExit := make(chan error)
                        go func() {
                                mnterr := umount.Wait()
                                if mnterr != nil {
-                                       runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+                                       runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
                                }
+                               umountExit <- mnterr
                        }()
 
-                       select {
-                       case <-runner.ArvMountExit:
-                               break
-                       case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
-                               runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
-                               umount.Process.Kill()
+                       for again := true; again; {
+                               again = false
+                               select {
+                               case <-umountExit:
+                                       umount = nil
+                                       again = true
+                               case <-runner.ArvMountExit:
+                                       break
+                               case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+                                       runner.CrunchLog.Printf("Timed out waiting for unmount")
+                                       if umount != nil {
+                                               umount.Process.Kill()
+                                       }
+                                       runner.ArvMount.Process.Kill()
+                               }
                        }
                }
        }
@@ -1403,6 +1461,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
 
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
+       runner.CrunchLog.Printf("crunch-run %s started", version)
        runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
 
        hostname, hosterr := os.Hostname()
@@ -1414,6 +1473,14 @@ func (runner *ContainerRunner) Run() (err error) {
 
        runner.finalState = "Queued"
 
+       defer func() {
+               runner.stopSignals()
+               runner.CleanupDirs()
+
+               runner.CrunchLog.Printf("crunch-run finished")
+               runner.CrunchLog.Close()
+       }()
+
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -1437,21 +1504,20 @@ func (runner *ContainerRunner) Run() (err error) {
                // Log the error encountered in Run(), if any
                checkErr(err)
 
-               if runner.finalState != "Queued" {
-                       if runner.IsCancelled() {
-                               runner.finalState = "Cancelled"
-                       }
-                       checkErr(runner.CaptureOutput())
+               if runner.finalState == "Queued" {
+                       runner.UpdateContainerFinal()
+                       return
                }
 
+               if runner.IsCancelled() {
+                       runner.finalState = "Cancelled"
+                       // but don't return yet -- we still want to
+                       // capture partial output and write logs
+               }
+
+               checkErr(runner.CaptureOutput())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
-
-               runner.stopSignals()
-               runner.CleanupDirs()
-
-               runner.CrunchLog.Printf("crunch-run finished")
-               runner.CrunchLog.Close()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1465,7 +1531,11 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1494,8 +1564,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
-
        if runner.IsCancelled() {
                return
        }
@@ -1506,8 +1574,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1571,8 +1642,17 @@ func main() {
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-run %s\n", version)
+               return
+       }
+
+       log.Printf("crunch-run %s started", version)
+
        containerId := flag.Arg(0)
 
        if *caCertsPath != "" {
@@ -1585,25 +1665,27 @@ func main() {
        }
        api.Retries = 8
 
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       var docker *dockerclient.Client
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
-       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
-
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        dockerClientProxy := ThinDockerClientProxy{Docker: docker}
 
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
+
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent