X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/045e3127cb48845c7d988d01488c055f02ae2ec3..HEAD:/lib/crunchrun/background.go diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go index 4bb249380f..adb65324b8 100644 --- a/lib/crunchrun/background.go +++ b/lib/crunchrun/background.go @@ -21,6 +21,7 @@ var ( lockprefix = "crunch-run-" locksuffix = ".lock" brokenfile = "crunch-run-broken" + pricesfile = "crunch-run-prices.json" ) // procinfo is saved in each process's lockfile. @@ -36,10 +37,10 @@ type procinfo struct { // // Stdout and stderr in the child process are sent to the systemd // journal using the systemd-cat program. -func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int { - return exitcode(stderr, detach(uuid, prog, args, stdout, stderr)) +func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + return exitcode(stderr, detach(uuid, prog, args, stdin, stdout)) } -func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error { +func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error { lockfile, err := func() (*os.File, error) { // We must hold the dir-level lock between // opening/creating the lockfile and acquiring LOCK_EX @@ -77,20 +78,24 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e // invoked as "/path/to/crunch-run" execargs = append([]string{prog}, execargs...) } - execargs = append([]string{ - // Here, if the inner systemd-cat can't exec - // crunch-run, it writes an error message to stderr, - // and the outer systemd-cat writes it to the journal - // where the operator has a chance to discover it. (If - // we only used one systemd-cat command, it would be - // up to us to report the error -- but we are going to - // detach and exit, not wait for something to appear - // on stderr.) Note these systemd-cat calls don't - // result in additional processes -- they just connect - // stderr/stdout to sockets and call exec(). - "systemd-cat", "--identifier=crunch-run", - "systemd-cat", "--identifier=crunch-run", - }, execargs...) + if _, err := exec.LookPath("systemd-cat"); err == nil { + execargs = append([]string{ + // Here, if the inner systemd-cat can't exec + // crunch-run, it writes an error message to + // stderr, and the outer systemd-cat writes it + // to the journal where the operator has a + // chance to discover it. (If we only used one + // systemd-cat command, it would be up to us + // to report the error -- but we are going to + // detach and exit, not wait for something to + // appear on stderr.) Note these systemd-cat + // calls don't result in additional processes + // -- they just connect stderr/stdout to + // sockets and call exec(). + "systemd-cat", "--identifier=crunch-run", + "systemd-cat", "--identifier=crunch-run", + }, execargs...) + } cmd := exec.Command(execargs[0], execargs[1:]...) // Child inherits lockfile. @@ -99,10 +104,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e // from parent (sshd) while sending lockfile content to // caller. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + // We need to manage our own OS pipe here to ensure the child + // process reads all of our stdin pipe before we return. + piper, pipew, err := os.Pipe() + if err != nil { + return err + } + defer pipew.Close() + cmd.Stdin = piper err = cmd.Start() if err != nil { return fmt.Errorf("exec %s: %s", cmd.Path, err) } + _, err = io.Copy(pipew, stdin) + if err != nil { + return err + } + err = pipew.Close() + if err != nil { + return err + } w := io.MultiWriter(stdout, lockfile) return json.NewEncoder(w).Encode(procinfo{ @@ -163,7 +184,20 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error { } // ListProcesses lists UUIDs of active crunch-run processes. -func ListProcesses(stdout, stderr io.Writer) int { +func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int { + if buf, err := io.ReadAll(stdin); err == nil && len(buf) > 0 { + // write latest pricing data to disk where + // current/future crunch-run processes can load it + fnm := filepath.Join(lockdir, pricesfile) + fnmtmp := fmt.Sprintf("%s~%d", fnm, os.Getpid()) + err := os.WriteFile(fnmtmp, buf, 0777) + if err != nil { + fmt.Fprintf(stderr, "error writing price data to %s: %s", fnmtmp, err) + } else if err = os.Rename(fnmtmp, fnm); err != nil { + fmt.Fprintf(stderr, "error renaming %s to %s: %s", fnmtmp, fnm, err) + os.Remove(fnmtmp) + } + } // filepath.Walk does not follow symlinks, so we must walk // lockdir+"/." in case lockdir itself is a symlink. walkdir := lockdir + "/." @@ -225,7 +259,7 @@ func ListProcesses(stdout, stderr io.Writer) int { fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err) return nil } - err = proc.Signal(syscall.Signal(0)) + err = proc.Signal(syscall.SIGUSR2) if err != nil { // Process is dead, even though lockfile was // still locked. Most likely a stuck arv-mount