X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e9eb0975e100e76d1da2a6997656ff2524a2ba31..15eda5715c312d12ec24c24db80448bee90e38ea:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 61179975b0..fc0dda718c 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -182,6 +182,7 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // passed through to HostConfig.NetworkMode + arvMountLog *ThrottledLogger } // setupSignals sets up signal handling to gracefully terminate the underlying @@ -193,7 +194,10 @@ func (runner *ContainerRunner) setupSignals() { signal.Notify(runner.SigChan, syscall.SIGQUIT) go func(sig chan os.Signal) { - <-sig + s := <-sig + if s != nil { + runner.CrunchLog.Printf("Caught signal %v", s) + } runner.stop() }(runner.SigChan) } @@ -210,18 +214,46 @@ func (runner *ContainerRunner) stop() { timeout := time.Duration(10) err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout)) if err != nil { - log.Printf("StopContainer failed: %s", err) + runner.CrunchLog.Printf("StopContainer failed: %s", err) } + // Suppress multiple calls to stop() + runner.cStarted = false } } -func (runner *ContainerRunner) teardown() { +func (runner *ContainerRunner) stopSignals() { if runner.SigChan != nil { signal.Stop(runner.SigChan) close(runner.SigChan) } } +var errorBlacklist = []string{"Cannot connect to the Docker daemon"} +var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") + +func (runner *ContainerRunner) checkBrokenNode(goterr error) bool { + for _, d := range errorBlacklist { + if strings.Index(goterr.Error(), d) != -1 { + runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr) + if *brokenNodeHook == "" { + runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.") + } else { + runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) + // run killme script + c := exec.Command(*brokenNodeHook) + c.Stdout = runner.CrunchLog + c.Stderr = runner.CrunchLog + err := c.Run() + if err != nil { + runner.CrunchLog.Printf("Error running broken node hook: %v", err) + } + } + return true + } + } + return false +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -291,9 +323,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - nt := NewThrottledLogger(runner.NewLogWriter("arv-mount")) - c.Stdout = nt - c.Stderr = nt + runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount")) + c.Stdout = runner.arvMountLog + c.Stderr = runner.arvMountLog + + runner.CrunchLog.Printf("Running %v", c.Args) err = c.Start() if err != nil { @@ -317,7 +351,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( }() go func() { - runner.ArvMountExit <- c.Wait() + mnterr := c.Wait() + if mnterr != nil { + runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr) + } + runner.ArvMountExit <- mnterr close(runner.ArvMountExit) }() @@ -346,11 +384,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return fmt.Errorf("While creating keep mount temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint) - pdhOnly := true tmpcount := 0 - arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"} + arvMountCmd := []string{ + "--foreground", + "--allow-other", + "--read-write", + fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) @@ -891,12 +931,23 @@ func (runner *ContainerRunner) WaitFinish() (err error) { waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + go func() { + <-runner.ArvMountExit + if runner.cStarted { + runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") + runner.stop() + } + }() + var waitBody dockercontainer.ContainerWaitOKBody select { case waitBody = <-waitOk: case err = <-waitErr: } + // Container isn't running any more + runner.cStarted = false + if err != nil { return fmt.Errorf("container wait: %v", err) } @@ -905,56 +956,33 @@ func (runner *ContainerRunner) WaitFinish() (err error) { code := int(waitBody.StatusCode) runner.ExitCode = &code - waitMount := runner.ArvMountExit - select { - case err = <-waitMount: - runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) - waitMount = nil - runner.stop() - default: - } - // wait for stdout/stderr to complete <-runner.loggingDone return nil } -// EvalSymlinks follows symlinks within the output directory. If the symlink -// leads to a keep mount, copy the manifest text from the keep mount into the -// output manifestText. Ensure that whether symlinks are relative or absolute, -// they must remain within the output directory. -// -// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir. -func (runner *ContainerRunner) EvalSymlinks(path string, binds []string, symlinksToRemove *[]string) (manifestText string, err error) { - var links []string - - defer func() { - if err != nil { - *symlinksToRemove = append(*symlinksToRemove, links...) - } - }() - - for n := 0; n < 32; n++ { - var info os.FileInfo - info, err = os.Lstat(path) - if err != nil { +var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory") + +func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) { + // Follow symlinks if necessary + info = startinfo + tgt = path + readlinktgt = "" + nextlink := path + for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ { + if followed >= limitFollowSymlinks { + // Got stuck in a loop or just a pathological number of links, give up. + err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path) return } - var tgt string - if info.Mode()&os.ModeSymlink == 0 { - // Not a symlink, nothing to do. - return - } - - // Remember symlink for cleanup later - links = append(links, path) - - tgt, err = os.Readlink(path) + readlinktgt, err = os.Readlink(nextlink) if err != nil { return } + + tgt = readlinktgt if !strings.HasPrefix(tgt, "/") { // Relative symlink, resolve it to host path tgt = filepath.Join(filepath.Dir(path), tgt) @@ -963,56 +991,121 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string, symlink // Absolute symlink to container output path, adjust it to host output path. tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):]) } + if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") { + // After dereferencing, symlink target must either be + // within output directory, or must point to a + // collection mount. + err = ErrNotInOutputDir + return + } - runner.CrunchLog.Printf("Resolve %q to %q", path, tgt) - - // go through mounts and try reverse map to collection reference - for _, bind := range binds { - mnt := runner.Container.Mounts[bind] - if tgt == bind || strings.HasPrefix(tgt, bind+"/") { - // get path relative to bind - targetSuffix := tgt[len(bind):] - - // Copy mount and adjust the path to add path relative to the bind - adjustedMount := mnt - adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix) - - for _, l := range links { - // The chain of one or more symlinks - // terminates in this keep mount, so - // add them all to the manifest text at - // appropriate locations. - var m string - outputSuffix := l[len(runner.HostOutputDir):] - m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix) - if err != nil { - return - } - manifestText = manifestText + m - *symlinksToRemove = append(*symlinksToRemove, l) - } - return - } + info, err = os.Lstat(tgt) + if err != nil { + // tgt + err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v", + path[len(runner.HostOutputDir):], readlinktgt, err) + return } - // If target is not a mount, it must be within the output - // directory, otherwise it is an error. - if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") { - err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.", - path[len(runner.HostOutputDir):], tgt) + nextlink = tgt + } + + return +} + +var limitFollowSymlinks = 10 + +// UploadFile uploads files within the output directory, with special handling +// for symlinks. If the symlink leads to a keep mount, copy the manifest text +// from the keep mount into the output manifestText. Ensure that whether +// symlinks are relative or absolute, every symlink target (even targets that +// are symlinks themselves) must point to a path in either the output directory +// or a collection mount. +// +// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir. +func (runner *ContainerRunner) UploadOutputFile( + path string, + info os.FileInfo, + infoerr error, + binds []string, + walkUpload *WalkUpload, + relocateFrom string, + relocateTo string, + followed int) (manifestText string, err error) { + + if info.Mode().IsDir() { + return + } + + if infoerr != nil { + return "", infoerr + } + + if followed >= limitFollowSymlinks { + // Got stuck in a loop or just a pathological number of + // directory links, give up. + err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path) + return + } + + // When following symlinks, the source path may need to be logically + // relocated to some other path within the output collection. Remove + // the relocateFrom prefix and replace it with relocateTo. + relocated := relocateTo + path[len(relocateFrom):] + + tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info) + if derefErr != nil && derefErr != ErrNotInOutputDir { + return "", derefErr + } + + // go through mounts and try reverse map to collection reference + for _, bind := range binds { + mnt := runner.Container.Mounts[bind] + if tgt == bind || strings.HasPrefix(tgt, bind+"/") { + // get path relative to bind + targetSuffix := tgt[len(bind):] + + // Copy mount and adjust the path to add path relative to the bind + adjustedMount := mnt + adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix) + + // Terminates in this keep mount, so add the + // manifest text at appropriate location. + outputSuffix := path[len(runner.HostOutputDir):] + manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix) return } + } + + // If target is not a collection mount, it must be located within the + // output directory, otherwise it is an error. + if derefErr == ErrNotInOutputDir { + err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.", + path[len(runner.HostOutputDir):], readlinktgt) + return + } - // Update symlink to host FS - os.Remove(path) - os.Symlink(tgt, path) + if info.Mode().IsRegular() { + return "", walkUpload.UploadFile(relocated, tgt) + } - // Target is within the output directory, so loop and check if - // it is also a symlink. - path = tgt + if info.Mode().IsDir() { + // Symlink leads to directory. Walk() doesn't follow + // directory symlinks, so we walk the target directory + // instead. Within the walk, file paths are relocated + // so they appear under the original symlink path. + err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { + var m string + m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr, + binds, walkUpload, tgt, relocated, followed+1) + if walkerr == nil { + manifestText = manifestText + m + } + return walkerr + }) + return } - // Got stuck in a loop or just a pathological number of links, give up. - err = fmt.Errorf("Too many symlinks.") + return } @@ -1062,29 +1155,25 @@ func (runner *ContainerRunner) CaptureOutput() error { if err != nil { // Regular directory - var symlinksToRemove []string + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} + walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger) + var m string - // Find symlinks to arv-mounted files & dirs. err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - m, err = runner.EvalSymlinks(path, binds, &symlinksToRemove) + m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0) if err == nil { manifestText = manifestText + m } return err }) - runner.CrunchLog.Printf("sm %q", symlinksToRemove) - for _, l := range symlinksToRemove { - os.Remove(l) - } + + cw.EndUpload(walkUpload) + if err != nil { - return fmt.Errorf("While checking output symlinks: %v", err) + return fmt.Errorf("While uploading output files: %v", err) } - cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} - m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + m, err = cw.ManifestText() manifestText = manifestText + m if err != nil { return fmt.Errorf("While uploading output files: %v", err) @@ -1190,21 +1279,55 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b func (runner *ContainerRunner) CleanupDirs() { if runner.ArvMount != nil { - umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint) - umnterr := umount.Run() + var delay int64 = 8 + umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint) + umount.Stdout = runner.CrunchLog + umount.Stderr = runner.CrunchLog + runner.CrunchLog.Printf("Running %v", umount.Args) + umnterr := umount.Start() + if umnterr != nil { - runner.CrunchLog.Printf("While running fusermount: %v", umnterr) + runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + } else { + // If arv-mount --unmount gets stuck for any reason, we + // don't want to wait for it forever. Do Wait() in a goroutine + // so it doesn't block crunch-run. + umountExit := make(chan error) + go func() { + mnterr := umount.Wait() + if mnterr != nil { + runner.CrunchLog.Printf("Error unmounting: %v", mnterr) + } + umountExit <- mnterr + }() + + for again := true; again; { + again = false + select { + case <-umountExit: + umount = nil + again = true + case <-runner.ArvMountExit: + break + case <-time.After(time.Duration((delay + 1) * int64(time.Second))): + runner.CrunchLog.Printf("Timed out waiting for unmount") + if umount != nil { + umount.Process.Kill() + } + runner.ArvMount.Process.Kill() + } + } } + } - mnterr := <-runner.ArvMountExit - if mnterr != nil { - runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr) + if runner.ArvMountPoint != "" { + if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { + runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) } } for _, tmpdir := range runner.CleanupTempDir { - rmerr := os.RemoveAll(tmpdir) - if rmerr != nil { + if rmerr := os.RemoveAll(tmpdir); rmerr != nil { runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr) } } @@ -1213,14 +1336,19 @@ func (runner *ContainerRunner) CleanupDirs() { // CommitLogs posts the collection containing the final container logs. func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Print(runner.finalState) + + if runner.arvMountLog != nil { + runner.arvMountLog.Close() + } runner.CrunchLog.Close() - // Closing CrunchLog above allows it to be committed to Keep at this + // Closing CrunchLog above allows them to be committed to Keep at this // point, but re-open crunch log with ArvClient in case there are any - // other further (such as failing to write the log to Keep!) while - // shutting down + // other further errors (such as failing to write the log to Keep!) + // while shutting down runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) + runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1307,8 +1435,11 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, - writeCloser: runner.LogCollection.Open(name + ".txt")} + return &ArvLogWriter{ + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: name, + writeCloser: runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle. @@ -1322,12 +1453,16 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } - // Clean up temporary directories _after_ finalizing - // everything (if we've made any by then) - defer runner.CleanupDirs() - runner.finalState = "Queued" + defer func() { + runner.stopSignals() + runner.CleanupDirs() + + runner.CrunchLog.Printf("crunch-run finished") + runner.CrunchLog.Close() + }() + defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1352,7 +1487,6 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(err) if runner.finalState == "Queued" { - runner.CrunchLog.Close() runner.UpdateContainerFinal() return } @@ -1366,13 +1500,6 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(runner.CaptureOutput()) checkErr(runner.CommitLogs()) checkErr(runner.UpdateContainerFinal()) - - // The real log is already closed, but then we opened - // a new one in case we needed to log anything while - // finalizing. - runner.CrunchLog.Close() - - runner.teardown() }() err = runner.fetchContainerRecord() @@ -1386,7 +1513,11 @@ func (runner *ContainerRunner) Run() (err error) { // check for and/or load image err = runner.LoadImage() if err != nil { - runner.finalState = "Cancelled" + if !runner.checkBrokenNode(err) { + // Failed to load image but not due to a "broken node" + // condition, probably user error. + runner.finalState = "Cancelled" + } err = fmt.Errorf("While loading container image: %v", err) return } @@ -1415,8 +1546,6 @@ func (runner *ContainerRunner) Run() (err error) { return } - runner.StartCrunchstat() - if runner.IsCancelled() { return } @@ -1427,8 +1556,11 @@ func (runner *ContainerRunner) Run() (err error) { } runner.finalState = "Cancelled" + runner.StartCrunchstat() + err = runner.StartContainer() if err != nil { + runner.checkBrokenNode(err) return } @@ -1506,25 +1638,27 @@ func main() { } api.Retries = 8 - var kc *keepclient.KeepClient - kc, err = keepclient.MakeKeepClient(api) - if err != nil { - log.Fatalf("%s: %v", containerId, err) + kc, kcerr := keepclient.MakeKeepClient(api) + if kcerr != nil { + log.Fatalf("%s: %v", containerId, kcerr) } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - var docker *dockerclient.Client // API version 1.21 corresponds to Docker 1.9, which is currently the // minimum version we want to support. - docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - if err != nil { - log.Fatalf("%s: %v", containerId, err) - } - + docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) dockerClientProxy := ThinDockerClientProxy{Docker: docker} cr := NewContainerRunner(api, kc, dockerClientProxy, containerId) + + if dockererr != nil { + cr.CrunchLog.Printf("%s: %v", containerId, dockererr) + cr.checkBrokenNode(dockererr) + cr.CrunchLog.Close() + os.Exit(1) + } + cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent