16347: Run a dedicated keepstore process for each container.
[arvados.git] / lib / crunchrun / background.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "os"
12         "os/exec"
13         "path/filepath"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 var (
20         lockdir    = "/var/lock"
21         lockprefix = "crunch-run-"
22         locksuffix = ".lock"
23         brokenfile = "crunch-run-broken"
24 )
25
26 // procinfo is saved in each process's lockfile.
27 type procinfo struct {
28         UUID string
29         PID  int
30 }
31
32 // Detach acquires a lock for the given uuid, and starts the current
33 // program as a child process (with -no-detach prepended to the given
34 // arguments so the child knows not to detach again). The lock is
35 // passed along to the child process.
36 //
37 // Stdout and stderr in the child process are sent to the systemd
38 // journal using the systemd-cat program.
39 func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
40         return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
41 }
42 func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
43         lockfile, err := func() (*os.File, error) {
44                 // We must hold the dir-level lock between
45                 // opening/creating the lockfile and acquiring LOCK_EX
46                 // on it, to avoid racing with the ListProcess's
47                 // alive-checking and garbage collection.
48                 dirlock, err := lockall()
49                 if err != nil {
50                         return nil, err
51                 }
52                 defer dirlock.Close()
53                 lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
54                 lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
55                 if err != nil {
56                         return nil, fmt.Errorf("open %s: %s", lockfilename, err)
57                 }
58                 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
59                 if err != nil {
60                         lockfile.Close()
61                         return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
62                 }
63                 return lockfile, nil
64         }()
65         if err != nil {
66                 return err
67         }
68         defer lockfile.Close()
69         lockfile.Truncate(0)
70
71         execargs := append([]string{"-no-detach"}, args...)
72         if strings.HasSuffix(prog, " crunch-run") {
73                 // invoked as "/path/to/arvados-server crunch-run"
74                 // (see arvados/lib/cmd.Multi)
75                 execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
76         } else {
77                 // invoked as "/path/to/crunch-run"
78                 execargs = append([]string{prog}, execargs...)
79         }
80         execargs = append([]string{
81                 // Here, if the inner systemd-cat can't exec
82                 // crunch-run, it writes an error message to stderr,
83                 // and the outer systemd-cat writes it to the journal
84                 // where the operator has a chance to discover it. (If
85                 // we only used one systemd-cat command, it would be
86                 // up to us to report the error -- but we are going to
87                 // detach and exit, not wait for something to appear
88                 // on stderr.)  Note these systemd-cat calls don't
89                 // result in additional processes -- they just connect
90                 // stderr/stdout to sockets and call exec().
91                 "systemd-cat", "--identifier=crunch-run",
92                 "systemd-cat", "--identifier=crunch-run",
93         }, execargs...)
94
95         cmd := exec.Command(execargs[0], execargs[1:]...)
96         // Child inherits lockfile.
97         cmd.ExtraFiles = []*os.File{lockfile}
98         // Ensure child isn't interrupted even if we receive signals
99         // from parent (sshd) while sending lockfile content to
100         // caller.
101         cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
102         // We need to manage our own OS pipe here to ensure the child
103         // process reads all of our stdin pipe before we return.
104         piper, pipew, err := os.Pipe()
105         if err != nil {
106                 return err
107         }
108         defer pipew.Close()
109         cmd.Stdin = piper
110         err = cmd.Start()
111         if err != nil {
112                 return fmt.Errorf("exec %s: %s", cmd.Path, err)
113         }
114         _, err = io.Copy(pipew, stdin)
115         if err != nil {
116                 return err
117         }
118         err = pipew.Close()
119         if err != nil {
120                 return err
121         }
122
123         w := io.MultiWriter(stdout, lockfile)
124         return json.NewEncoder(w).Encode(procinfo{
125                 UUID: uuid,
126                 PID:  cmd.Process.Pid,
127         })
128 }
129
130 // KillProcess finds the crunch-run process corresponding to the given
131 // uuid, and sends the given signal to it. It then waits up to 1
132 // second for the process to die. It returns 0 if the process is
133 // successfully killed or didn't exist in the first place.
134 func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
135         return exitcode(stderr, kill(uuid, signal, stdout, stderr))
136 }
137
138 func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
139         path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
140         f, err := os.Open(path)
141         if os.IsNotExist(err) {
142                 return nil
143         } else if err != nil {
144                 return fmt.Errorf("open %s: %s", path, err)
145         }
146         defer f.Close()
147
148         var pi procinfo
149         err = json.NewDecoder(f).Decode(&pi)
150         if err != nil {
151                 return fmt.Errorf("decode %s: %s", path, err)
152         }
153
154         if pi.UUID != uuid || pi.PID == 0 {
155                 return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
156         }
157
158         proc, err := os.FindProcess(pi.PID)
159         if err != nil {
160                 // FindProcess should have succeeded, even if the
161                 // process does not exist.
162                 return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
163         }
164
165         // Send the requested signal once, then send signal 0 a few
166         // times.  When proc.Signal() returns an error (process no
167         // longer exists), return success.  If that doesn't happen
168         // within 1 second, return an error.
169         err = proc.Signal(signal)
170         for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
171                 err = proc.Signal(syscall.Signal(0))
172         }
173         if err == nil {
174                 // Reached deadline without a proc.Signal() error.
175                 return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
176         }
177         fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
178         return nil
179 }
180
181 // ListProcesses lists UUIDs of active crunch-run processes.
182 func ListProcesses(stdout, stderr io.Writer) int {
183         // filepath.Walk does not follow symlinks, so we must walk
184         // lockdir+"/." in case lockdir itself is a symlink.
185         walkdir := lockdir + "/."
186         return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
187                 if info.IsDir() && path != walkdir {
188                         return filepath.SkipDir
189                 }
190                 if name := info.Name(); name == brokenfile {
191                         fmt.Fprintln(stdout, "broken")
192                         return nil
193                 } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
194                         return nil
195                 }
196                 if info.Size() == 0 {
197                         // race: process has opened/locked but hasn't yet written pid/uuid
198                         return nil
199                 }
200
201                 f, err := os.Open(path)
202                 if err != nil {
203                         return nil
204                 }
205                 defer f.Close()
206
207                 // Ensure other processes don't acquire this lockfile
208                 // after we have decided it is abandoned but before we
209                 // have deleted it.
210                 dirlock, err := lockall()
211                 if err != nil {
212                         return err
213                 }
214                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)
215                 if err == nil {
216                         // lockfile is stale
217                         err := os.Remove(path)
218                         dirlock.Close()
219                         if err != nil {
220                                 fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
221                         }
222                         return nil
223                 }
224                 dirlock.Close()
225
226                 var pi procinfo
227                 err = json.NewDecoder(f).Decode(&pi)
228                 if err != nil {
229                         fmt.Fprintf(stderr, "%s: %s\n", path, err)
230                         return nil
231                 }
232                 if pi.UUID == "" || pi.PID == 0 {
233                         fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
234                         return nil
235                 }
236
237                 proc, err := os.FindProcess(pi.PID)
238                 if err != nil {
239                         // FindProcess should have succeeded, even if the
240                         // process does not exist.
241                         fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
242                         return nil
243                 }
244                 err = proc.Signal(syscall.Signal(0))
245                 if err != nil {
246                         // Process is dead, even though lockfile was
247                         // still locked. Most likely a stuck arv-mount
248                         // process that inherited the lock from
249                         // crunch-run. Report container UUID as
250                         // "stale".
251                         fmt.Fprintln(stdout, pi.UUID, "stale")
252                         return nil
253                 }
254
255                 fmt.Fprintln(stdout, pi.UUID)
256                 return nil
257         }))
258 }
259
260 // If err is nil, return 0 ("success"); otherwise, print err to stderr
261 // and return 1.
262 func exitcode(stderr io.Writer, err error) int {
263         if err != nil {
264                 fmt.Fprintln(stderr, err)
265                 return 1
266         }
267         return 0
268 }
269
270 // Acquire a dir-level lock. Must be held while creating or deleting
271 // container-specific lockfiles, to avoid races during the intervals
272 // when those container-specific lockfiles are open but not locked.
273 //
274 // Caller releases the lock by closing the returned file.
275 func lockall() (*os.File, error) {
276         lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
277         f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
278         if err != nil {
279                 return nil, fmt.Errorf("open %s: %s", lockfile, err)
280         }
281         err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
282         if err != nil {
283                 f.Close()
284                 return nil, fmt.Errorf("lock %s: %s", lockfile, err)
285         }
286         return f, nil
287 }