12934: crunch-run creates .keep file in empty directories
[arvados.git] / services / crunch-run / crunchrun.go
index 72a2f1af35034808a107638c0e8d34c22848b970..59fdd007e2b636f3b0be975a4add3d610980d657 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "regexp"
        "runtime"
        "runtime/pprof"
        "sort"
@@ -39,6 +40,8 @@ import (
        dockerclient "github.com/docker/docker/client"
 )
 
+var version = "dev"
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -228,15 +231,18 @@ func (runner *ContainerRunner) stopSignals() {
        }
 }
 
-var dockerErrorBlacklist = []string{"Cannot connect to the Docker daemon"}
-var brokenNodeHook *string
+var errorBlacklist = []string{
+       "(?ms).*[Cc]annot connect to the Docker daemon.*",
+       "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
+}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
 
 func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
-       for _, d := range dockerErrorBlacklist {
-               if strings.Index(goterr.Error(), d) != -1 {
-                       runner.CrunchLog.Printf("Error suggests node is unable to run containers.")
+       for _, d := range errorBlacklist {
+               if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
                        if *brokenNodeHook == "" {
-                               runner.CrunchLog.Printf("No broken node hook provided")
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
                        } else {
                                runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
                                // run killme script
@@ -384,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
 
+       token, err := runner.ContainerToken()
+       if err != nil {
+               return fmt.Errorf("could not get container token: %s", err)
+       }
+
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{
@@ -532,6 +543,18 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                return fmt.Errorf("writing temp file: %v", err)
                        }
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+
+               case mnt.Kind == "git_tree":
+                       tmpdir, err := runner.MkTempDir("", "")
+                       if err != nil {
+                               return fmt.Errorf("creating temp dir: %v", err)
+                       }
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
+                       if err != nil {
+                               return err
+                       }
+                       runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
                }
        }
 
@@ -556,11 +579,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
-       token, err := runner.ContainerToken()
-       if err != nil {
-               return fmt.Errorf("could not get container token: %s", err)
-       }
-
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
        if err != nil {
                return fmt.Errorf("While trying to start arv-mount: %v", err)
@@ -642,11 +660,10 @@ type infoCommand struct {
        cmd   []string
 }
 
-// Gather node information and store it on the log for debugging
+// LogNodeInfo gathers node information and store it on the log for debugging
 // purposes.
 func (runner *ContainerRunner) LogNodeInfo() (err error) {
        w := runner.NewLogWriter("node-info")
-       logger := log.New(w, "node-info", 0)
 
        commands := []infoCommand{
                {
@@ -672,17 +689,17 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        }
 
        // Run commands with informational output to be logged.
-       var out []byte
        for _, command := range commands {
-               out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
-               if err != nil {
-                       return fmt.Errorf("While running command %q: %v",
-                               command.cmd, err)
-               }
-               logger.Println(command.label)
-               for _, line := range strings.Split(string(out), "\n") {
-                       logger.Println(" ", line)
+               fmt.Fprintln(w, command.label)
+               cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
+               cmd.Stdout = w
+               cmd.Stderr = w
+               if err := cmd.Run(); err != nil {
+                       err = fmt.Errorf("While running command %q: %v", command.cmd, err)
+                       fmt.Fprintln(w, err)
+                       return err
                }
+               fmt.Fprintln(w, "")
        }
 
        err = w.Close()
@@ -692,7 +709,7 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        return nil
 }
 
-// Get and save the raw JSON container record from the API server
+// LogContainerRecord gets and saves the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
                ArvClient:     runner.ArvClient,
@@ -915,7 +932,7 @@ func (runner *ContainerRunner) StartContainer() error {
                dockertypes.ContainerStartOptions{})
        if err != nil {
                var advice string
-               if strings.Contains(err.Error(), "no such file or directory") {
+               if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
                        advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
@@ -1034,6 +1051,21 @@ func (runner *ContainerRunner) UploadOutputFile(
        followed int) (manifestText string, err error) {
 
        if info.Mode().IsDir() {
+               // if empty, need to create a .keep file
+               dir, direrr := os.Open(path)
+               if (direrr != nil) {
+                       return "", direrr
+               }
+               defer dir.Close()
+               names, eof := dir.Readdirnames(1)
+               if len(names) == 0 && eof == io.EOF {
+                       keep, keeperr := os.Create(path+"/.keep")
+                       if keeperr != nil {
+                               return "", keeperr
+                       }
+                       keep.Close()
+               }
+
                return
        }
 
@@ -1444,6 +1476,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
 
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
+       runner.CrunchLog.Printf("crunch-run %s started", version)
        runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
 
        hostname, hosterr := os.Hostname()
@@ -1474,11 +1507,6 @@ func (runner *ContainerRunner) Run() (err error) {
                                return
                        }
                        runner.CrunchLog.Print(e)
-                       if runner.checkBrokenNode(e) {
-                               // A container failing due to "broken node"
-                               // error should back into queue to run again.
-                               runner.finalState = "Queued"
-                       }
                        if err == nil {
                                err = e
                        }
@@ -1518,7 +1546,11 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1547,8 +1579,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
-
        if runner.IsCancelled() {
                return
        }
@@ -1559,8 +1589,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1624,9 +1657,17 @@ func main() {
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
-       brokenNodeHook = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-run %s\n", version)
+               return
+       }
+
+       log.Printf("crunch-run %s started", version)
+
        containerId := flag.Arg(0)
 
        if *caCertsPath != "" {
@@ -1640,6 +1681,11 @@ func main() {
        api.Retries = 8
 
        kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
+       }
+       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
+       kc.Retries = 4
 
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
@@ -1648,11 +1694,6 @@ func main() {
 
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
 
-       if kcerr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerId, kcerr)
-               cr.CrunchLog.Close()
-               os.Exit(1)
-       }
        if dockererr != nil {
                cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
                cr.checkBrokenNode(dockererr)
@@ -1660,8 +1701,6 @@ func main() {
                os.Exit(1)
        }
 
-       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
-       kc.Retries = 4
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent