1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 lockprefix = "crunch-run-"
23 brokenfile = "crunch-run-broken"
26 // procinfo is saved in each process's lockfile.
27 type procinfo struct {
32 // Detach acquires a lock for the given uuid, and starts the current
33 // program as a child process (with -no-detach prepended to the given
34 // arguments so the child knows not to detach again). The lock is
35 // passed along to the child process.
37 // Stdout and stderr in the child process are sent to the systemd
38 // journal using the systemd-cat program.
39 func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
40 return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
42 func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
43 lockfile, err := func() (*os.File, error) {
44 // We must hold the dir-level lock between
45 // opening/creating the lockfile and acquiring LOCK_EX
46 // on it, to avoid racing with the ListProcess's
47 // alive-checking and garbage collection.
48 dirlock, err := lockall()
53 lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
54 lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
56 return nil, fmt.Errorf("open %s: %s", lockfilename, err)
58 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
61 return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
68 defer lockfile.Close()
71 execargs := append([]string{"-no-detach"}, args...)
72 if strings.HasSuffix(prog, " crunch-run") {
73 // invoked as "/path/to/arvados-server crunch-run"
74 // (see arvados/lib/cmd.Multi)
75 execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
77 // invoked as "/path/to/crunch-run"
78 execargs = append([]string{prog}, execargs...)
80 if _, err := exec.LookPath("systemd-cat"); err == nil {
81 execargs = append([]string{
82 // Here, if the inner systemd-cat can't exec
83 // crunch-run, it writes an error message to
84 // stderr, and the outer systemd-cat writes it
85 // to the journal where the operator has a
86 // chance to discover it. (If we only used one
87 // systemd-cat command, it would be up to us
88 // to report the error -- but we are going to
89 // detach and exit, not wait for something to
90 // appear on stderr.) Note these systemd-cat
91 // calls don't result in additional processes
92 // -- they just connect stderr/stdout to
93 // sockets and call exec().
94 "systemd-cat", "--identifier=crunch-run",
95 "systemd-cat", "--identifier=crunch-run",
99 cmd := exec.Command(execargs[0], execargs[1:]...)
100 // Child inherits lockfile.
101 cmd.ExtraFiles = []*os.File{lockfile}
102 // Ensure child isn't interrupted even if we receive signals
103 // from parent (sshd) while sending lockfile content to
105 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
106 // We need to manage our own OS pipe here to ensure the child
107 // process reads all of our stdin pipe before we return.
108 piper, pipew, err := os.Pipe()
116 return fmt.Errorf("exec %s: %s", cmd.Path, err)
118 _, err = io.Copy(pipew, stdin)
127 w := io.MultiWriter(stdout, lockfile)
128 return json.NewEncoder(w).Encode(procinfo{
130 PID: cmd.Process.Pid,
134 // KillProcess finds the crunch-run process corresponding to the given
135 // uuid, and sends the given signal to it. It then waits up to 1
136 // second for the process to die. It returns 0 if the process is
137 // successfully killed or didn't exist in the first place.
138 func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
139 return exitcode(stderr, kill(uuid, signal, stdout, stderr))
142 func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
143 path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
144 f, err := os.Open(path)
145 if os.IsNotExist(err) {
147 } else if err != nil {
148 return fmt.Errorf("open %s: %s", path, err)
153 err = json.NewDecoder(f).Decode(&pi)
155 return fmt.Errorf("decode %s: %s", path, err)
158 if pi.UUID != uuid || pi.PID == 0 {
159 return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
162 proc, err := os.FindProcess(pi.PID)
164 // FindProcess should have succeeded, even if the
165 // process does not exist.
166 return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
169 // Send the requested signal once, then send signal 0 a few
170 // times. When proc.Signal() returns an error (process no
171 // longer exists), return success. If that doesn't happen
172 // within 1 second, return an error.
173 err = proc.Signal(signal)
174 for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
175 err = proc.Signal(syscall.Signal(0))
178 // Reached deadline without a proc.Signal() error.
179 return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
181 fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
185 // ListProcesses lists UUIDs of active crunch-run processes.
186 func ListProcesses(stdout, stderr io.Writer) int {
187 // filepath.Walk does not follow symlinks, so we must walk
188 // lockdir+"/." in case lockdir itself is a symlink.
189 walkdir := lockdir + "/."
190 return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
191 if info.IsDir() && path != walkdir {
192 return filepath.SkipDir
194 if name := info.Name(); name == brokenfile {
195 fmt.Fprintln(stdout, "broken")
197 } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
200 if info.Size() == 0 {
201 // race: process has opened/locked but hasn't yet written pid/uuid
205 f, err := os.Open(path)
211 // Ensure other processes don't acquire this lockfile
212 // after we have decided it is abandoned but before we
214 dirlock, err := lockall()
218 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)
221 err := os.Remove(path)
224 fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
231 err = json.NewDecoder(f).Decode(&pi)
233 fmt.Fprintf(stderr, "%s: %s\n", path, err)
236 if pi.UUID == "" || pi.PID == 0 {
237 fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
241 proc, err := os.FindProcess(pi.PID)
243 // FindProcess should have succeeded, even if the
244 // process does not exist.
245 fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
248 err = proc.Signal(syscall.Signal(0))
250 // Process is dead, even though lockfile was
251 // still locked. Most likely a stuck arv-mount
252 // process that inherited the lock from
253 // crunch-run. Report container UUID as
255 fmt.Fprintln(stdout, pi.UUID, "stale")
259 fmt.Fprintln(stdout, pi.UUID)
264 // If err is nil, return 0 ("success"); otherwise, print err to stderr
266 func exitcode(stderr io.Writer, err error) int {
268 fmt.Fprintln(stderr, err)
274 // Acquire a dir-level lock. Must be held while creating or deleting
275 // container-specific lockfiles, to avoid races during the intervals
276 // when those container-specific lockfiles are open but not locked.
278 // Caller releases the lock by closing the returned file.
279 func lockall() (*os.File, error) {
280 lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
281 f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
283 return nil, fmt.Errorf("open %s: %s", lockfile, err)
285 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
288 return nil, fmt.Errorf("lock %s: %s", lockfile, err)