X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/84bc109580e503b4b8bbb5ddcd5f1d909745141f..6fe66955fb53dde27d9677d31fdb137913b2b850:/services/crunch-run/crunchrun.go?ds=sidebyside diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 72a2f1af35..59fdd007e2 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -19,6 +19,7 @@ import ( "os/signal" "path" "path/filepath" + "regexp" "runtime" "runtime/pprof" "sort" @@ -39,6 +40,8 @@ import ( dockerclient "github.com/docker/docker/client" ) +var version = "dev" + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -228,15 +231,18 @@ func (runner *ContainerRunner) stopSignals() { } } -var dockerErrorBlacklist = []string{"Cannot connect to the Docker daemon"} -var brokenNodeHook *string +var errorBlacklist = []string{ + "(?ms).*[Cc]annot connect to the Docker daemon.*", + "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*", +} +var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") func (runner *ContainerRunner) checkBrokenNode(goterr error) bool { - for _, d := range dockerErrorBlacklist { - if strings.Index(goterr.Error(), d) != -1 { - runner.CrunchLog.Printf("Error suggests node is unable to run containers.") + for _, d := range errorBlacklist { + if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil { + runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr) if *brokenNodeHook == "" { - runner.CrunchLog.Printf("No broken node hook provided") + runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.") } else { runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) // run killme script @@ -384,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return fmt.Errorf("While creating keep mount temp dir: %v", err) } + token, err := runner.ContainerToken() + if err != nil { + return fmt.Errorf("could not get container token: %s", err) + } + pdhOnly := true tmpcount := 0 arvMountCmd := []string{ @@ -532,6 +543,18 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return fmt.Errorf("writing temp file: %v", err) } runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind)) + + case mnt.Kind == "git_tree": + tmpdir, err := runner.MkTempDir("", "") + if err != nil { + return fmt.Errorf("creating temp dir: %v", err) + } + runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir) + err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token) + if err != nil { + return err + } + runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro") } } @@ -556,11 +579,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) - token, err := runner.ContainerToken() - if err != nil { - return fmt.Errorf("could not get container token: %s", err) - } - runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { return fmt.Errorf("While trying to start arv-mount: %v", err) @@ -642,11 +660,10 @@ type infoCommand struct { cmd []string } -// Gather node information and store it on the log for debugging +// LogNodeInfo gathers node information and store it on the log for debugging // purposes. func (runner *ContainerRunner) LogNodeInfo() (err error) { w := runner.NewLogWriter("node-info") - logger := log.New(w, "node-info", 0) commands := []infoCommand{ { @@ -672,17 +689,17 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { } // Run commands with informational output to be logged. - var out []byte for _, command := range commands { - out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput() - if err != nil { - return fmt.Errorf("While running command %q: %v", - command.cmd, err) - } - logger.Println(command.label) - for _, line := range strings.Split(string(out), "\n") { - logger.Println(" ", line) + fmt.Fprintln(w, command.label) + cmd := exec.Command(command.cmd[0], command.cmd[1:]...) + cmd.Stdout = w + cmd.Stderr = w + if err := cmd.Run(); err != nil { + err = fmt.Errorf("While running command %q: %v", command.cmd, err) + fmt.Fprintln(w, err) + return err } + fmt.Fprintln(w, "") } err = w.Close() @@ -692,7 +709,7 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { return nil } -// Get and save the raw JSON container record from the API server +// LogContainerRecord gets and saves the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() (err error) { w := &ArvLogWriter{ ArvClient: runner.ArvClient, @@ -915,7 +932,7 @@ func (runner *ContainerRunner) StartContainer() error { dockertypes.ContainerStartOptions{}) if err != nil { var advice string - if strings.Contains(err.Error(), "no such file or directory") { + if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil { advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0]) } return fmt.Errorf("could not start container: %v%s", err, advice) @@ -1034,6 +1051,21 @@ func (runner *ContainerRunner) UploadOutputFile( followed int) (manifestText string, err error) { if info.Mode().IsDir() { + // if empty, need to create a .keep file + dir, direrr := os.Open(path) + if (direrr != nil) { + return "", direrr + } + defer dir.Close() + names, eof := dir.Readdirnames(1) + if len(names) == 0 && eof == io.EOF { + keep, keeperr := os.Create(path+"/.keep") + if keeperr != nil { + return "", keeperr + } + keep.Close() + } + return } @@ -1444,6 +1476,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { + runner.CrunchLog.Printf("crunch-run %s started", version) runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) hostname, hosterr := os.Hostname() @@ -1474,11 +1507,6 @@ func (runner *ContainerRunner) Run() (err error) { return } runner.CrunchLog.Print(e) - if runner.checkBrokenNode(e) { - // A container failing due to "broken node" - // error should back into queue to run again. - runner.finalState = "Queued" - } if err == nil { err = e } @@ -1518,7 +1546,11 @@ func (runner *ContainerRunner) Run() (err error) { // check for and/or load image err = runner.LoadImage() if err != nil { - runner.finalState = "Cancelled" + if !runner.checkBrokenNode(err) { + // Failed to load image but not due to a "broken node" + // condition, probably user error. + runner.finalState = "Cancelled" + } err = fmt.Errorf("While loading container image: %v", err) return } @@ -1547,8 +1579,6 @@ func (runner *ContainerRunner) Run() (err error) { return } - runner.StartCrunchstat() - if runner.IsCancelled() { return } @@ -1559,8 +1589,11 @@ func (runner *ContainerRunner) Run() (err error) { } runner.finalState = "Cancelled" + runner.StartCrunchstat() + err = runner.StartContainer() if err != nil { + runner.checkBrokenNode(err) return } @@ -1624,9 +1657,17 @@ func main() { `Set networking mode for container. Corresponds to Docker network mode (--net). `) memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container") - brokenNodeHook = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") + getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("crunch-run %s\n", version) + return + } + + log.Printf("crunch-run %s started", version) + containerId := flag.Arg(0) if *caCertsPath != "" { @@ -1640,6 +1681,11 @@ func main() { api.Retries = 8 kc, kcerr := keepclient.MakeKeepClient(api) + if kcerr != nil { + log.Fatalf("%s: %v", containerId, kcerr) + } + kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} + kc.Retries = 4 // API version 1.21 corresponds to Docker 1.9, which is currently the // minimum version we want to support. @@ -1648,11 +1694,6 @@ func main() { cr := NewContainerRunner(api, kc, dockerClientProxy, containerId) - if kcerr != nil { - cr.CrunchLog.Printf("%s: %v", containerId, kcerr) - cr.CrunchLog.Close() - os.Exit(1) - } if dockererr != nil { cr.CrunchLog.Printf("%s: %v", containerId, dockererr) cr.checkBrokenNode(dockererr) @@ -1660,8 +1701,6 @@ func main() { os.Exit(1) } - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} - kc.Retries = 4 cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent