X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b6d7efab2c4bffa3fabd55b166e44cca8ac1391f..44c8f9ed561513b607d3eca752ad3e1efd376f56:/lib/crunchrun/background.go diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go index bf039afa0a..adb65324b8 100644 --- a/lib/crunchrun/background.go +++ b/lib/crunchrun/background.go @@ -21,6 +21,7 @@ var ( lockprefix = "crunch-run-" locksuffix = ".lock" brokenfile = "crunch-run-broken" + pricesfile = "crunch-run-prices.json" ) // procinfo is saved in each process's lockfile. @@ -36,10 +37,10 @@ type procinfo struct { // // Stdout and stderr in the child process are sent to the systemd // journal using the systemd-cat program. -func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int { - return exitcode(stderr, detach(uuid, prog, args, stdout, stderr)) +func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + return exitcode(stderr, detach(uuid, prog, args, stdin, stdout)) } -func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error { +func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error { lockfile, err := func() (*os.File, error) { // We must hold the dir-level lock between // opening/creating the lockfile and acquiring LOCK_EX @@ -77,20 +78,24 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e // invoked as "/path/to/crunch-run" execargs = append([]string{prog}, execargs...) } - execargs = append([]string{ - // Here, if the inner systemd-cat can't exec - // crunch-run, it writes an error message to stderr, - // and the outer systemd-cat writes it to the journal - // where the operator has a chance to discover it. (If - // we only used one systemd-cat command, it would be - // up to us to report the error -- but we are going to - // detach and exit, not wait for something to appear - // on stderr.) Note these systemd-cat calls don't - // result in additional processes -- they just connect - // stderr/stdout to sockets and call exec(). - "systemd-cat", "--identifier=crunch-run", - "systemd-cat", "--identifier=crunch-run", - }, execargs...) + if _, err := exec.LookPath("systemd-cat"); err == nil { + execargs = append([]string{ + // Here, if the inner systemd-cat can't exec + // crunch-run, it writes an error message to + // stderr, and the outer systemd-cat writes it + // to the journal where the operator has a + // chance to discover it. (If we only used one + // systemd-cat command, it would be up to us + // to report the error -- but we are going to + // detach and exit, not wait for something to + // appear on stderr.) Note these systemd-cat + // calls don't result in additional processes + // -- they just connect stderr/stdout to + // sockets and call exec(). + "systemd-cat", "--identifier=crunch-run", + "systemd-cat", "--identifier=crunch-run", + }, execargs...) + } cmd := exec.Command(execargs[0], execargs[1:]...) // Child inherits lockfile. @@ -99,10 +104,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e // from parent (sshd) while sending lockfile content to // caller. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + // We need to manage our own OS pipe here to ensure the child + // process reads all of our stdin pipe before we return. + piper, pipew, err := os.Pipe() + if err != nil { + return err + } + defer pipew.Close() + cmd.Stdin = piper err = cmd.Start() if err != nil { return fmt.Errorf("exec %s: %s", cmd.Path, err) } + _, err = io.Copy(pipew, stdin) + if err != nil { + return err + } + err = pipew.Close() + if err != nil { + return err + } w := io.MultiWriter(stdout, lockfile) return json.NewEncoder(w).Encode(procinfo{ @@ -132,7 +153,7 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error { var pi procinfo err = json.NewDecoder(f).Decode(&pi) if err != nil { - return fmt.Errorf("decode %s: %s\n", path, err) + return fmt.Errorf("decode %s: %s", path, err) } if pi.UUID != uuid || pi.PID == 0 { @@ -162,8 +183,21 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error { return nil } -// List UUIDs of active crunch-run processes. -func ListProcesses(stdout, stderr io.Writer) int { +// ListProcesses lists UUIDs of active crunch-run processes. +func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int { + if buf, err := io.ReadAll(stdin); err == nil && len(buf) > 0 { + // write latest pricing data to disk where + // current/future crunch-run processes can load it + fnm := filepath.Join(lockdir, pricesfile) + fnmtmp := fmt.Sprintf("%s~%d", fnm, os.Getpid()) + err := os.WriteFile(fnmtmp, buf, 0777) + if err != nil { + fmt.Fprintf(stderr, "error writing price data to %s: %s", fnmtmp, err) + } else if err = os.Rename(fnmtmp, fnm); err != nil { + fmt.Fprintf(stderr, "error renaming %s to %s: %s", fnmtmp, fnm, err) + os.Remove(fnmtmp) + } + } // filepath.Walk does not follow symlinks, so we must walk // lockdir+"/." in case lockdir itself is a symlink. walkdir := lockdir + "/." @@ -218,6 +252,24 @@ func ListProcesses(stdout, stderr io.Writer) int { return nil } + proc, err := os.FindProcess(pi.PID) + if err != nil { + // FindProcess should have succeeded, even if the + // process does not exist. + fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err) + return nil + } + err = proc.Signal(syscall.SIGUSR2) + if err != nil { + // Process is dead, even though lockfile was + // still locked. Most likely a stuck arv-mount + // process that inherited the lock from + // crunch-run. Report container UUID as + // "stale". + fmt.Fprintln(stdout, pi.UUID, "stale") + return nil + } + fmt.Fprintln(stdout, pi.UUID) return nil }))