1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 lockprefix = "crunch-run-"
23 brokenfile = "crunch-run-broken"
24 pricesfile = "crunch-run-prices.json"
27 // procinfo is saved in each process's lockfile.
28 type procinfo struct {
33 // Detach acquires a lock for the given uuid, and starts the current
34 // program as a child process (with -no-detach prepended to the given
35 // arguments so the child knows not to detach again). The lock is
36 // passed along to the child process.
38 // Stdout and stderr in the child process are sent to the systemd
39 // journal using the systemd-cat program.
40 func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
41 return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
43 func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
44 lockfile, err := func() (*os.File, error) {
45 // We must hold the dir-level lock between
46 // opening/creating the lockfile and acquiring LOCK_EX
47 // on it, to avoid racing with the ListProcess's
48 // alive-checking and garbage collection.
49 dirlock, err := lockall()
54 lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
55 lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
57 return nil, fmt.Errorf("open %s: %s", lockfilename, err)
59 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
62 return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
69 defer lockfile.Close()
72 execargs := append([]string{"-no-detach"}, args...)
73 if strings.HasSuffix(prog, " crunch-run") {
74 // invoked as "/path/to/arvados-server crunch-run"
75 // (see arvados/lib/cmd.Multi)
76 execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
78 // invoked as "/path/to/crunch-run"
79 execargs = append([]string{prog}, execargs...)
81 if _, err := exec.LookPath("systemd-cat"); err == nil {
82 execargs = append([]string{
83 // Here, if the inner systemd-cat can't exec
84 // crunch-run, it writes an error message to
85 // stderr, and the outer systemd-cat writes it
86 // to the journal where the operator has a
87 // chance to discover it. (If we only used one
88 // systemd-cat command, it would be up to us
89 // to report the error -- but we are going to
90 // detach and exit, not wait for something to
91 // appear on stderr.) Note these systemd-cat
92 // calls don't result in additional processes
93 // -- they just connect stderr/stdout to
94 // sockets and call exec().
95 "systemd-cat", "--identifier=crunch-run",
96 "systemd-cat", "--identifier=crunch-run",
100 cmd := exec.Command(execargs[0], execargs[1:]...)
101 // Child inherits lockfile.
102 cmd.ExtraFiles = []*os.File{lockfile}
103 // Ensure child isn't interrupted even if we receive signals
104 // from parent (sshd) while sending lockfile content to
106 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
107 // We need to manage our own OS pipe here to ensure the child
108 // process reads all of our stdin pipe before we return.
109 piper, pipew, err := os.Pipe()
117 return fmt.Errorf("exec %s: %s", cmd.Path, err)
119 _, err = io.Copy(pipew, stdin)
128 w := io.MultiWriter(stdout, lockfile)
129 return json.NewEncoder(w).Encode(procinfo{
131 PID: cmd.Process.Pid,
135 // KillProcess finds the crunch-run process corresponding to the given
136 // uuid, and sends the given signal to it. It then waits up to 1
137 // second for the process to die. It returns 0 if the process is
138 // successfully killed or didn't exist in the first place.
139 func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
140 return exitcode(stderr, kill(uuid, signal, stdout, stderr))
143 func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
144 path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
145 f, err := os.Open(path)
146 if os.IsNotExist(err) {
148 } else if err != nil {
149 return fmt.Errorf("open %s: %s", path, err)
154 err = json.NewDecoder(f).Decode(&pi)
156 return fmt.Errorf("decode %s: %s", path, err)
159 if pi.UUID != uuid || pi.PID == 0 {
160 return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
163 proc, err := os.FindProcess(pi.PID)
165 // FindProcess should have succeeded, even if the
166 // process does not exist.
167 return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
170 // Send the requested signal once, then send signal 0 a few
171 // times. When proc.Signal() returns an error (process no
172 // longer exists), return success. If that doesn't happen
173 // within 1 second, return an error.
174 err = proc.Signal(signal)
175 for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
176 err = proc.Signal(syscall.Signal(0))
179 // Reached deadline without a proc.Signal() error.
180 return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
182 fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
186 // ListProcesses lists UUIDs of active crunch-run processes.
187 func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int {
188 if buf, err := io.ReadAll(stdin); err == nil && len(buf) > 0 {
189 // write latest pricing data to disk where
190 // current/future crunch-run processes can load it
191 fnm := filepath.Join(lockdir, pricesfile)
192 fnmtmp := fmt.Sprintf("%s~%d", fnm, os.Getpid())
193 err := os.WriteFile(fnmtmp, buf, 0777)
195 fmt.Fprintf(stderr, "error writing price data to %s: %s", fnmtmp, err)
196 } else if err = os.Rename(fnmtmp, fnm); err != nil {
197 fmt.Fprintf(stderr, "error renaming %s to %s: %s", fnmtmp, fnm, err)
201 // filepath.Walk does not follow symlinks, so we must walk
202 // lockdir+"/." in case lockdir itself is a symlink.
203 walkdir := lockdir + "/."
204 return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
205 if info.IsDir() && path != walkdir {
206 return filepath.SkipDir
208 if name := info.Name(); name == brokenfile {
209 fmt.Fprintln(stdout, "broken")
211 } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
214 if info.Size() == 0 {
215 // race: process has opened/locked but hasn't yet written pid/uuid
219 f, err := os.Open(path)
225 // Ensure other processes don't acquire this lockfile
226 // after we have decided it is abandoned but before we
228 dirlock, err := lockall()
232 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)
235 err := os.Remove(path)
238 fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
245 err = json.NewDecoder(f).Decode(&pi)
247 fmt.Fprintf(stderr, "%s: %s\n", path, err)
250 if pi.UUID == "" || pi.PID == 0 {
251 fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
255 proc, err := os.FindProcess(pi.PID)
257 // FindProcess should have succeeded, even if the
258 // process does not exist.
259 fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
262 err = proc.Signal(syscall.SIGUSR2)
264 // Process is dead, even though lockfile was
265 // still locked. Most likely a stuck arv-mount
266 // process that inherited the lock from
267 // crunch-run. Report container UUID as
269 fmt.Fprintln(stdout, pi.UUID, "stale")
273 fmt.Fprintln(stdout, pi.UUID)
278 // If err is nil, return 0 ("success"); otherwise, print err to stderr
280 func exitcode(stderr io.Writer, err error) int {
282 fmt.Fprintln(stderr, err)
288 // Acquire a dir-level lock. Must be held while creating or deleting
289 // container-specific lockfiles, to avoid races during the intervals
290 // when those container-specific lockfiles are open but not locked.
292 // Caller releases the lock by closing the returned file.
293 func lockall() (*os.File, error) {
294 lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
295 f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
297 return nil, fmt.Errorf("open %s: %s", lockfile, err)
299 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
302 return nil, fmt.Errorf("lock %s: %s", lockfile, err)