Merge branch '21366-subprocess-output-loading-bug' into main. Closes #21366
[arvados.git] / lib / crunchrun / background.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "os"
12         "os/exec"
13         "path/filepath"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 var (
20         lockdir    = "/var/lock"
21         lockprefix = "crunch-run-"
22         locksuffix = ".lock"
23         brokenfile = "crunch-run-broken"
24         pricesfile = "crunch-run-prices.json"
25 )
26
27 // procinfo is saved in each process's lockfile.
28 type procinfo struct {
29         UUID string
30         PID  int
31 }
32
33 // Detach acquires a lock for the given uuid, and starts the current
34 // program as a child process (with -no-detach prepended to the given
35 // arguments so the child knows not to detach again). The lock is
36 // passed along to the child process.
37 //
38 // Stdout and stderr in the child process are sent to the systemd
39 // journal using the systemd-cat program.
40 func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
41         return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
42 }
43 func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
44         lockfile, err := func() (*os.File, error) {
45                 // We must hold the dir-level lock between
46                 // opening/creating the lockfile and acquiring LOCK_EX
47                 // on it, to avoid racing with the ListProcess's
48                 // alive-checking and garbage collection.
49                 dirlock, err := lockall()
50                 if err != nil {
51                         return nil, err
52                 }
53                 defer dirlock.Close()
54                 lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
55                 lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
56                 if err != nil {
57                         return nil, fmt.Errorf("open %s: %s", lockfilename, err)
58                 }
59                 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
60                 if err != nil {
61                         lockfile.Close()
62                         return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
63                 }
64                 return lockfile, nil
65         }()
66         if err != nil {
67                 return err
68         }
69         defer lockfile.Close()
70         lockfile.Truncate(0)
71
72         execargs := append([]string{"-no-detach"}, args...)
73         if strings.HasSuffix(prog, " crunch-run") {
74                 // invoked as "/path/to/arvados-server crunch-run"
75                 // (see arvados/lib/cmd.Multi)
76                 execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
77         } else {
78                 // invoked as "/path/to/crunch-run"
79                 execargs = append([]string{prog}, execargs...)
80         }
81         if _, err := exec.LookPath("systemd-cat"); err == nil {
82                 execargs = append([]string{
83                         // Here, if the inner systemd-cat can't exec
84                         // crunch-run, it writes an error message to
85                         // stderr, and the outer systemd-cat writes it
86                         // to the journal where the operator has a
87                         // chance to discover it. (If we only used one
88                         // systemd-cat command, it would be up to us
89                         // to report the error -- but we are going to
90                         // detach and exit, not wait for something to
91                         // appear on stderr.)  Note these systemd-cat
92                         // calls don't result in additional processes
93                         // -- they just connect stderr/stdout to
94                         // sockets and call exec().
95                         "systemd-cat", "--identifier=crunch-run",
96                         "systemd-cat", "--identifier=crunch-run",
97                 }, execargs...)
98         }
99
100         cmd := exec.Command(execargs[0], execargs[1:]...)
101         // Child inherits lockfile.
102         cmd.ExtraFiles = []*os.File{lockfile}
103         // Ensure child isn't interrupted even if we receive signals
104         // from parent (sshd) while sending lockfile content to
105         // caller.
106         cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
107         // We need to manage our own OS pipe here to ensure the child
108         // process reads all of our stdin pipe before we return.
109         piper, pipew, err := os.Pipe()
110         if err != nil {
111                 return err
112         }
113         defer pipew.Close()
114         cmd.Stdin = piper
115         err = cmd.Start()
116         if err != nil {
117                 return fmt.Errorf("exec %s: %s", cmd.Path, err)
118         }
119         _, err = io.Copy(pipew, stdin)
120         if err != nil {
121                 return err
122         }
123         err = pipew.Close()
124         if err != nil {
125                 return err
126         }
127
128         w := io.MultiWriter(stdout, lockfile)
129         return json.NewEncoder(w).Encode(procinfo{
130                 UUID: uuid,
131                 PID:  cmd.Process.Pid,
132         })
133 }
134
135 // KillProcess finds the crunch-run process corresponding to the given
136 // uuid, and sends the given signal to it. It then waits up to 1
137 // second for the process to die. It returns 0 if the process is
138 // successfully killed or didn't exist in the first place.
139 func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
140         return exitcode(stderr, kill(uuid, signal, stdout, stderr))
141 }
142
143 func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
144         path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
145         f, err := os.Open(path)
146         if os.IsNotExist(err) {
147                 return nil
148         } else if err != nil {
149                 return fmt.Errorf("open %s: %s", path, err)
150         }
151         defer f.Close()
152
153         var pi procinfo
154         err = json.NewDecoder(f).Decode(&pi)
155         if err != nil {
156                 return fmt.Errorf("decode %s: %s", path, err)
157         }
158
159         if pi.UUID != uuid || pi.PID == 0 {
160                 return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
161         }
162
163         proc, err := os.FindProcess(pi.PID)
164         if err != nil {
165                 // FindProcess should have succeeded, even if the
166                 // process does not exist.
167                 return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
168         }
169
170         // Send the requested signal once, then send signal 0 a few
171         // times.  When proc.Signal() returns an error (process no
172         // longer exists), return success.  If that doesn't happen
173         // within 1 second, return an error.
174         err = proc.Signal(signal)
175         for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
176                 err = proc.Signal(syscall.Signal(0))
177         }
178         if err == nil {
179                 // Reached deadline without a proc.Signal() error.
180                 return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
181         }
182         fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
183         return nil
184 }
185
186 // ListProcesses lists UUIDs of active crunch-run processes.
187 func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int {
188         if buf, err := io.ReadAll(stdin); err == nil && len(buf) > 0 {
189                 // write latest pricing data to disk where
190                 // current/future crunch-run processes can load it
191                 fnm := filepath.Join(lockdir, pricesfile)
192                 fnmtmp := fmt.Sprintf("%s~%d", fnm, os.Getpid())
193                 err := os.WriteFile(fnmtmp, buf, 0777)
194                 if err != nil {
195                         fmt.Fprintf(stderr, "error writing price data to %s: %s", fnmtmp, err)
196                 } else if err = os.Rename(fnmtmp, fnm); err != nil {
197                         fmt.Fprintf(stderr, "error renaming %s to %s: %s", fnmtmp, fnm, err)
198                         os.Remove(fnmtmp)
199                 }
200         }
201         // filepath.Walk does not follow symlinks, so we must walk
202         // lockdir+"/." in case lockdir itself is a symlink.
203         walkdir := lockdir + "/."
204         return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
205                 if info.IsDir() && path != walkdir {
206                         return filepath.SkipDir
207                 }
208                 if name := info.Name(); name == brokenfile {
209                         fmt.Fprintln(stdout, "broken")
210                         return nil
211                 } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
212                         return nil
213                 }
214                 if info.Size() == 0 {
215                         // race: process has opened/locked but hasn't yet written pid/uuid
216                         return nil
217                 }
218
219                 f, err := os.Open(path)
220                 if err != nil {
221                         return nil
222                 }
223                 defer f.Close()
224
225                 // Ensure other processes don't acquire this lockfile
226                 // after we have decided it is abandoned but before we
227                 // have deleted it.
228                 dirlock, err := lockall()
229                 if err != nil {
230                         return err
231                 }
232                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)
233                 if err == nil {
234                         // lockfile is stale
235                         err := os.Remove(path)
236                         dirlock.Close()
237                         if err != nil {
238                                 fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
239                         }
240                         return nil
241                 }
242                 dirlock.Close()
243
244                 var pi procinfo
245                 err = json.NewDecoder(f).Decode(&pi)
246                 if err != nil {
247                         fmt.Fprintf(stderr, "%s: %s\n", path, err)
248                         return nil
249                 }
250                 if pi.UUID == "" || pi.PID == 0 {
251                         fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
252                         return nil
253                 }
254
255                 proc, err := os.FindProcess(pi.PID)
256                 if err != nil {
257                         // FindProcess should have succeeded, even if the
258                         // process does not exist.
259                         fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
260                         return nil
261                 }
262                 err = proc.Signal(syscall.SIGUSR2)
263                 if err != nil {
264                         // Process is dead, even though lockfile was
265                         // still locked. Most likely a stuck arv-mount
266                         // process that inherited the lock from
267                         // crunch-run. Report container UUID as
268                         // "stale".
269                         fmt.Fprintln(stdout, pi.UUID, "stale")
270                         return nil
271                 }
272
273                 fmt.Fprintln(stdout, pi.UUID)
274                 return nil
275         }))
276 }
277
278 // If err is nil, return 0 ("success"); otherwise, print err to stderr
279 // and return 1.
280 func exitcode(stderr io.Writer, err error) int {
281         if err != nil {
282                 fmt.Fprintln(stderr, err)
283                 return 1
284         }
285         return 0
286 }
287
288 // Acquire a dir-level lock. Must be held while creating or deleting
289 // container-specific lockfiles, to avoid races during the intervals
290 // when those container-specific lockfiles are open but not locked.
291 //
292 // Caller releases the lock by closing the returned file.
293 func lockall() (*os.File, error) {
294         lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
295         f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
296         if err != nil {
297                 return nil, fmt.Errorf("open %s: %s", lockfile, err)
298         }
299         err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
300         if err != nil {
301                 f.Close()
302                 return nil, fmt.Errorf("lock %s: %s", lockfile, err)
303         }
304         return f, nil
305 }