16347: Add LocalKeepLogsToContainerLog config (none/errors/all).
[arvados.git] / lib / crunchrun / background.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "os"
12         "os/exec"
13         "path/filepath"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 var (
20         lockdir    = "/var/lock"
21         lockprefix = "crunch-run-"
22         locksuffix = ".lock"
23         brokenfile = "crunch-run-broken"
24 )
25
26 // procinfo is saved in each process's lockfile.
27 type procinfo struct {
28         UUID string
29         PID  int
30 }
31
32 // Detach acquires a lock for the given uuid, and starts the current
33 // program as a child process (with -no-detach prepended to the given
34 // arguments so the child knows not to detach again). The lock is
35 // passed along to the child process.
36 //
37 // Stdout and stderr in the child process are sent to the systemd
38 // journal using the systemd-cat program.
39 func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
40         return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
41 }
42 func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
43         lockfile, err := func() (*os.File, error) {
44                 // We must hold the dir-level lock between
45                 // opening/creating the lockfile and acquiring LOCK_EX
46                 // on it, to avoid racing with the ListProcess's
47                 // alive-checking and garbage collection.
48                 dirlock, err := lockall()
49                 if err != nil {
50                         return nil, err
51                 }
52                 defer dirlock.Close()
53                 lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
54                 lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
55                 if err != nil {
56                         return nil, fmt.Errorf("open %s: %s", lockfilename, err)
57                 }
58                 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
59                 if err != nil {
60                         lockfile.Close()
61                         return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
62                 }
63                 return lockfile, nil
64         }()
65         if err != nil {
66                 return err
67         }
68         defer lockfile.Close()
69         lockfile.Truncate(0)
70
71         execargs := append([]string{"-no-detach"}, args...)
72         if strings.HasSuffix(prog, " crunch-run") {
73                 // invoked as "/path/to/arvados-server crunch-run"
74                 // (see arvados/lib/cmd.Multi)
75                 execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
76         } else {
77                 // invoked as "/path/to/crunch-run"
78                 execargs = append([]string{prog}, execargs...)
79         }
80         if _, err := exec.LookPath("systemd-cat"); err == nil {
81                 execargs = append([]string{
82                         // Here, if the inner systemd-cat can't exec
83                         // crunch-run, it writes an error message to
84                         // stderr, and the outer systemd-cat writes it
85                         // to the journal where the operator has a
86                         // chance to discover it. (If we only used one
87                         // systemd-cat command, it would be up to us
88                         // to report the error -- but we are going to
89                         // detach and exit, not wait for something to
90                         // appear on stderr.)  Note these systemd-cat
91                         // calls don't result in additional processes
92                         // -- they just connect stderr/stdout to
93                         // sockets and call exec().
94                         "systemd-cat", "--identifier=crunch-run",
95                         "systemd-cat", "--identifier=crunch-run",
96                 }, execargs...)
97         }
98
99         cmd := exec.Command(execargs[0], execargs[1:]...)
100         // Child inherits lockfile.
101         cmd.ExtraFiles = []*os.File{lockfile}
102         // Ensure child isn't interrupted even if we receive signals
103         // from parent (sshd) while sending lockfile content to
104         // caller.
105         cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
106         // We need to manage our own OS pipe here to ensure the child
107         // process reads all of our stdin pipe before we return.
108         piper, pipew, err := os.Pipe()
109         if err != nil {
110                 return err
111         }
112         defer pipew.Close()
113         cmd.Stdin = piper
114         err = cmd.Start()
115         if err != nil {
116                 return fmt.Errorf("exec %s: %s", cmd.Path, err)
117         }
118         _, err = io.Copy(pipew, stdin)
119         if err != nil {
120                 return err
121         }
122         err = pipew.Close()
123         if err != nil {
124                 return err
125         }
126
127         w := io.MultiWriter(stdout, lockfile)
128         return json.NewEncoder(w).Encode(procinfo{
129                 UUID: uuid,
130                 PID:  cmd.Process.Pid,
131         })
132 }
133
134 // KillProcess finds the crunch-run process corresponding to the given
135 // uuid, and sends the given signal to it. It then waits up to 1
136 // second for the process to die. It returns 0 if the process is
137 // successfully killed or didn't exist in the first place.
138 func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
139         return exitcode(stderr, kill(uuid, signal, stdout, stderr))
140 }
141
142 func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
143         path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
144         f, err := os.Open(path)
145         if os.IsNotExist(err) {
146                 return nil
147         } else if err != nil {
148                 return fmt.Errorf("open %s: %s", path, err)
149         }
150         defer f.Close()
151
152         var pi procinfo
153         err = json.NewDecoder(f).Decode(&pi)
154         if err != nil {
155                 return fmt.Errorf("decode %s: %s", path, err)
156         }
157
158         if pi.UUID != uuid || pi.PID == 0 {
159                 return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
160         }
161
162         proc, err := os.FindProcess(pi.PID)
163         if err != nil {
164                 // FindProcess should have succeeded, even if the
165                 // process does not exist.
166                 return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
167         }
168
169         // Send the requested signal once, then send signal 0 a few
170         // times.  When proc.Signal() returns an error (process no
171         // longer exists), return success.  If that doesn't happen
172         // within 1 second, return an error.
173         err = proc.Signal(signal)
174         for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
175                 err = proc.Signal(syscall.Signal(0))
176         }
177         if err == nil {
178                 // Reached deadline without a proc.Signal() error.
179                 return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
180         }
181         fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
182         return nil
183 }
184
185 // ListProcesses lists UUIDs of active crunch-run processes.
186 func ListProcesses(stdout, stderr io.Writer) int {
187         // filepath.Walk does not follow symlinks, so we must walk
188         // lockdir+"/." in case lockdir itself is a symlink.
189         walkdir := lockdir + "/."
190         return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
191                 if info.IsDir() && path != walkdir {
192                         return filepath.SkipDir
193                 }
194                 if name := info.Name(); name == brokenfile {
195                         fmt.Fprintln(stdout, "broken")
196                         return nil
197                 } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
198                         return nil
199                 }
200                 if info.Size() == 0 {
201                         // race: process has opened/locked but hasn't yet written pid/uuid
202                         return nil
203                 }
204
205                 f, err := os.Open(path)
206                 if err != nil {
207                         return nil
208                 }
209                 defer f.Close()
210
211                 // Ensure other processes don't acquire this lockfile
212                 // after we have decided it is abandoned but before we
213                 // have deleted it.
214                 dirlock, err := lockall()
215                 if err != nil {
216                         return err
217                 }
218                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)
219                 if err == nil {
220                         // lockfile is stale
221                         err := os.Remove(path)
222                         dirlock.Close()
223                         if err != nil {
224                                 fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
225                         }
226                         return nil
227                 }
228                 dirlock.Close()
229
230                 var pi procinfo
231                 err = json.NewDecoder(f).Decode(&pi)
232                 if err != nil {
233                         fmt.Fprintf(stderr, "%s: %s\n", path, err)
234                         return nil
235                 }
236                 if pi.UUID == "" || pi.PID == 0 {
237                         fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
238                         return nil
239                 }
240
241                 proc, err := os.FindProcess(pi.PID)
242                 if err != nil {
243                         // FindProcess should have succeeded, even if the
244                         // process does not exist.
245                         fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
246                         return nil
247                 }
248                 err = proc.Signal(syscall.Signal(0))
249                 if err != nil {
250                         // Process is dead, even though lockfile was
251                         // still locked. Most likely a stuck arv-mount
252                         // process that inherited the lock from
253                         // crunch-run. Report container UUID as
254                         // "stale".
255                         fmt.Fprintln(stdout, pi.UUID, "stale")
256                         return nil
257                 }
258
259                 fmt.Fprintln(stdout, pi.UUID)
260                 return nil
261         }))
262 }
263
264 // If err is nil, return 0 ("success"); otherwise, print err to stderr
265 // and return 1.
266 func exitcode(stderr io.Writer, err error) int {
267         if err != nil {
268                 fmt.Fprintln(stderr, err)
269                 return 1
270         }
271         return 0
272 }
273
274 // Acquire a dir-level lock. Must be held while creating or deleting
275 // container-specific lockfiles, to avoid races during the intervals
276 // when those container-specific lockfiles are open but not locked.
277 //
278 // Caller releases the lock by closing the returned file.
279 func lockall() (*os.File, error) {
280         lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
281         f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
282         if err != nil {
283                 return nil, fmt.Errorf("open %s: %s", lockfile, err)
284         }
285         err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
286         if err != nil {
287                 f.Close()
288                 return nil, fmt.Errorf("lock %s: %s", lockfile, err)
289         }
290         return f, nil
291 }