X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b205525d0b7c7b9042513fe77d2e8061534208ae..7baff32e82b0bfc961dc9a285da8ce187d4fe0b6:/services/crunch-run/background.go diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go index be5b2e685b..852ccb6ece 100644 --- a/services/crunch-run/background.go +++ b/services/crunch-run/background.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "os" "os/exec" "path/filepath" @@ -21,50 +20,55 @@ var ( lockdir = "/var/lock" lockprefix = "crunch-run-" locksuffix = ".lock" + brokenfile = "crunch-run-broken" ) // procinfo is saved in each process's lockfile. type procinfo struct { - UUID string - PID int - Stdout string - Stderr string + UUID string + PID int } // Detach acquires a lock for the given uuid, and starts the current // program as a child process (with -no-detach prepended to the given // arguments so the child knows not to detach again). The lock is // passed along to the child process. +// +// Stdout and stderr in the child process are sent to the systemd +// journal using the systemd-cat program. func Detach(uuid string, args []string, stdout, stderr io.Writer) int { return exitcode(stderr, detach(uuid, args, stdout, stderr)) } func detach(uuid string, args []string, stdout, stderr io.Writer) error { - lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700) + lockfile, err := func() (*os.File, error) { + // We must hold the dir-level lock between + // opening/creating the lockfile and acquiring LOCK_EX + // on it, to avoid racing with the ListProcess's + // alive-checking and garbage collection. + dirlock, err := lockall() + if err != nil { + return nil, err + } + defer dirlock.Close() + lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix) + lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700) + if err != nil { + return nil, fmt.Errorf("open %s: %s", lockfilename, err) + } + err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + lockfile.Close() + return nil, fmt.Errorf("lock %s: %s", lockfilename, err) + } + return lockfile, nil + }() if err != nil { return err } defer lockfile.Close() - err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - return err - } lockfile.Truncate(0) - outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-") - if err != nil { - return err - } - defer outfile.Close() - errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-") - if err != nil { - os.Remove(outfile.Name()) - return err - } - defer errfile.Close() - - cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...) - cmd.Stdout = outfile - cmd.Stderr = errfile + cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...) // Child inherits lockfile. cmd.ExtraFiles = []*os.File{lockfile} // Ensure child isn't interrupted even if we receive signals @@ -73,24 +77,14 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} err = cmd.Start() if err != nil { - os.Remove(outfile.Name()) - os.Remove(errfile.Name()) - return err + return fmt.Errorf("exec %s: %s", cmd.Path, err) } w := io.MultiWriter(stdout, lockfile) - err = json.NewEncoder(w).Encode(procinfo{ - UUID: uuid, - PID: cmd.Process.Pid, - Stdout: outfile.Name(), - Stderr: errfile.Name(), + return json.NewEncoder(w).Encode(procinfo{ + UUID: uuid, + PID: cmd.Process.Pid, }) - if err != nil { - os.Remove(outfile.Name()) - os.Remove(errfile.Name()) - return err - } - return nil } // KillProcess finds the crunch-run process corresponding to the given @@ -107,14 +101,14 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error { if os.IsNotExist(err) { return nil } else if err != nil { - return err + return fmt.Errorf("open %s: %s", path, err) } defer f.Close() var pi procinfo err = json.NewDecoder(f).Decode(&pi) if err != nil { - return fmt.Errorf("%s: %s\n", path, err) + return fmt.Errorf("decode %s: %s\n", path, err) } if pi.UUID != uuid || pi.PID == 0 { @@ -123,27 +117,40 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error { proc, err := os.FindProcess(pi.PID) if err != nil { - return err + // FindProcess should have succeeded, even if the + // process does not exist. + return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err) } + // Send the requested signal once, then send signal 0 a few + // times. When proc.Signal() returns an error (process no + // longer exists), return success. If that doesn't happen + // within 1 second, return an error. err = proc.Signal(signal) for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) { err = proc.Signal(syscall.Signal(0)) } if err == nil { - return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal) + // Reached deadline without a proc.Signal() error. + return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal) } - fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err) + fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err) return nil } // List UUIDs of active crunch-run processes. func ListProcesses(stdout, stderr io.Writer) int { - return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error { - if info.IsDir() { + // filepath.Walk does not follow symlinks, so we must walk + // lockdir+"/." in case lockdir itself is a symlink. + walkdir := lockdir + "/." + return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() && path != walkdir { return filepath.SkipDir } - if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) { + if name := info.Name(); name == brokenfile { + fmt.Fprintln(stdout, "broken") + return nil + } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) { return nil } if info.Size() == 0 { @@ -157,19 +164,24 @@ func ListProcesses(stdout, stderr io.Writer) int { } defer f.Close() - // TODO: Do this check without risk of disrupting lock - // acquisition during races, e.g., by connecting to a - // unix socket or checking /proc/$pid/fd/$n -> - // lockfile. + // Ensure other processes don't acquire this lockfile + // after we have decided it is abandoned but before we + // have deleted it. + dirlock, err := lockall() + if err != nil { + return err + } err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB) if err == nil { // lockfile is stale err := os.Remove(path) + dirlock.Close() if err != nil { - fmt.Fprintln(stderr, err) + fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err) } return nil } + dirlock.Close() var pi procinfo err = json.NewDecoder(f).Decode(&pi) @@ -196,3 +208,22 @@ func exitcode(stderr io.Writer, err error) int { } return 0 } + +// Acquire a dir-level lock. Must be held while creating or deleting +// container-specific lockfiles, to avoid races during the intervals +// when those container-specific lockfiles are open but not locked. +// +// Caller releases the lock by closing the returned file. +func lockall() (*os.File, error) { + lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix) + f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700) + if err != nil { + return nil, fmt.Errorf("open %s: %s", lockfile, err) + } + err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) + if err != nil { + f.Close() + return nil, fmt.Errorf("lock %s: %s", lockfile, err) + } + return f, nil +}