X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/899d369bef489b89d9ce1b1cd5e07ce8304a9a85..3c5de241f6a6ac56e8bf986c89ffe153b9d941fe:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index a424088b25..55edb99823 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -182,6 +182,7 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // passed through to HostConfig.NetworkMode + arvMountLog *ThrottledLogger arvMountKill func() } @@ -194,7 +195,10 @@ func (runner *ContainerRunner) setupSignals() { signal.Notify(runner.SigChan, syscall.SIGQUIT) go func(sig chan os.Signal) { - <-sig + s := <-sig + if s != nil { + runner.CrunchLog.Printf("Caught signal %v", s) + } runner.stop() }(runner.SigChan) } @@ -211,8 +215,10 @@ func (runner *ContainerRunner) stop() { timeout := time.Duration(10) err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout)) if err != nil { - log.Printf("StopContainer failed: %s", err) + runner.CrunchLog.Printf("StopContainer failed: %s", err) } + // Suppress multiple calls to stop() + runner.cStarted = false } } @@ -292,9 +298,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - nt := NewThrottledLogger(runner.NewLogWriter("arv-mount")) - c.Stdout = nt - c.Stderr = nt + runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount")) + c.Stdout = runner.arvMountLog + c.Stderr = runner.arvMountLog + + runner.CrunchLog.Printf("Running %v", c.Args) err = c.Start() if err != nil { @@ -322,7 +330,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( }() go func() { - runner.ArvMountExit <- c.Wait() + mnterr := c.Wait() + if mnterr != nil { + runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr) + } + runner.ArvMountExit <- mnterr close(runner.ArvMountExit) }() @@ -355,7 +367,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { pdhOnly := true tmpcount := 0 - arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"} + arvMountCmd := []string{ + "--foreground", + "--allow-other", + "--read-write", + fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) @@ -896,12 +912,23 @@ func (runner *ContainerRunner) WaitFinish() (err error) { waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + go func() { + <-runner.ArvMountExit + if runner.cStarted { + runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") + runner.stop() + } + }() + var waitBody dockercontainer.ContainerWaitOKBody select { case waitBody = <-waitOk: case err = <-waitErr: } + // Container isn't running any more + runner.cStarted = false + if err != nil { return fmt.Errorf("container wait: %v", err) } @@ -910,15 +937,6 @@ func (runner *ContainerRunner) WaitFinish() (err error) { code := int(waitBody.StatusCode) runner.ExitCode = &code - waitMount := runner.ArvMountExit - select { - case err = <-waitMount: - runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) - waitMount = nil - runner.stop() - default: - } - // wait for stdout/stderr to complete <-runner.loggingDone @@ -1242,25 +1260,29 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b func (runner *ContainerRunner) CleanupDirs() { if runner.ArvMount != nil { - //umount := exec.Command("fusermount", "-u", runner.ArvMountPoint) - umount := exec.Command("sleep", "1") - umnterr := umount.Run() - if umnterr != nil { - log.Printf("While running fusermount: %v", umnterr) - } - timeout := time.NewTimer(10 * time.Second) - select { - case mnterr := <-runner.ArvMountExit: - if mnterr != nil { - log.Printf("Arv-mount exit error: %v", mnterr) - } - case <-timeout.C: - log.Printf("Timeout waiting for arv-mount to end. Killing arv-mount.") - runner.arvMountKill() - umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint) - umnterr = umount.Run() + var umount *exec.Cmd + umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint) + done := false + try := 1 + for !done { + umnterr := umount.Run() if umnterr != nil { - log.Printf("While running arv-mount --unmount: %v", umnterr) + runner.CrunchLog.Printf("Error: %v", umnterr) + } + timeout := time.NewTimer(10 * time.Second) + select { + case <-runner.ArvMountExit: + done = true + case <-timeout.C: + if try == 1 { + runner.CrunchLog.Printf("Timeout waiting for arv-mount to end. Will force unmount.") + umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint) + try = 2 + } else { + runner.CrunchLog.Printf("Killing arv-mount") + runner.arvMountKill() + umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint) + } } } } @@ -1276,14 +1298,17 @@ func (runner *ContainerRunner) CleanupDirs() { // CommitLogs posts the collection containing the final container logs. func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Print(runner.finalState) + + runner.arvMountLog.Close() runner.CrunchLog.Close() - // Closing CrunchLog above allows it to be committed to Keep at this + // Closing CrunchLog above allows them to be committed to Keep at this // point, but re-open crunch log with ArvClient in case there are any - // other further (such as failing to write the log to Keep!) while - // shutting down + // other further errors (such as failing to write the log to Keep!) + // while shutting down runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) + runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1370,8 +1395,11 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, - writeCloser: runner.LogCollection.Open(name + ".txt")} + return &ArvLogWriter{ + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: name, + writeCloser: runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle. @@ -1387,6 +1415,9 @@ func (runner *ContainerRunner) Run() (err error) { // Clean up temporary directories _after_ finalizing // everything (if we've made any by then) + defer func() { + runner.CrunchLog.Printf("crunch-run finished") + }() defer runner.CleanupDirs() runner.finalState = "Queued"