17244: Use crunch-run's cgroup, not init's, for "host" stats.
[arvados.git] / lib / crunchrun / crunchrun.go
index 23fbc430b42611eda108563e91f5387d766208ce..9add35335f39380ca61f78390f0eae1ce7ebda03 100644 (file)
@@ -6,16 +6,21 @@ package crunchrun
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "log"
+       "net"
+       "net/http"
        "os"
        "os/exec"
        "os/signal"
+       "os/user"
        "path"
        "path/filepath"
        "regexp"
@@ -27,19 +32,31 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cloud"
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/crunchstat"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
-       "golang.org/x/net/context"
+       "golang.org/x/sys/unix"
 )
 
 type command struct{}
 
 var Command = command{}
 
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+       Env          map[string]string
+       KeepBuffers  int
+       EC2SpotCheck bool
+       Cluster      *arvados.Cluster
+}
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -66,7 +83,7 @@ type IKeepClient interface {
 // NewLogWriter is a factory function to create a new log writer.
 type NewLogWriter func(name string) (io.WriteCloser, error)
 
-type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
+type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
 
@@ -77,7 +94,10 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       executor containerExecutor
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -106,8 +126,6 @@ type ContainerRunner struct {
        ExitCode      *int
        NewLogWriter  NewLogWriter
        CrunchLog     *ThrottledLogger
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -125,26 +143,22 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
+       keepstore        *exec.Cmd
+       keepstoreLogger  io.WriteCloser
+       keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
        hoststatReporter *crunchstat.Reporter
        statInterval     time.Duration
-       cgroupRoot       string
-       // What we expect the container's cgroup parent to be.
-       expectCgroupParent string
        // What we tell docker to use as the container's cgroup
-       // parent. Note: Ideally we would use the same field for both
-       // expectCgroupParent and setCgroupParent, and just make it
-       // default to "docker". However, when using docker < 1.10 with
-       // systemd, specifying a non-empty cgroup parent (even the
-       // default value "docker") hits a docker bug
-       // (https://github.com/docker/docker/issues/17126). Using two
-       // separate fields makes it possible to use the "expect cgroup
-       // parent to be X" feature even on sites where the "specify
-       // cgroup parent" feature breaks.
+       // parent.
        setCgroupParent string
+       // Fake root dir where crunchstat.Reporter should read OS
+       // files, for testing.
+       crunchstatFakeFS fs.FS
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
@@ -152,11 +166,15 @@ type ContainerRunner struct {
        enableMemoryLimit bool
        enableNetwork     string // one of "default" or "always"
        networkMode       string // "none", "host", or "" -- passed through to executor
+       brokenNodeHook    string // script to run if node appears to be broken
        arvMountLog       *ThrottledLogger
 
        containerWatchdogInterval time.Duration
 
        gateway Gateway
+
+       prices     []cloud.InstancePrice
+       pricesLock sync.Mutex
 }
 
 // setupSignals sets up signal handling to gracefully terminate the
@@ -195,10 +213,9 @@ var errorBlacklist = []string{
        "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
        "(?ms).*grpc: the connection is unavailable.*",
 }
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
-       if *brokenNodeHook == "" {
+       if runner.brokenNodeHook == "" {
                path := filepath.Join(lockdir, brokenfile)
                runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
                f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
@@ -208,9 +225,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() {
                }
                f.Close()
        } else {
-               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+               runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
                // run killme script
-               c := exec.Command(*brokenNodeHook)
+               c := exec.Command(runner.brokenNodeHook)
                c.Stdout = runner.CrunchLog
                c.Stderr = runner.CrunchLog
                err := c.Run()
@@ -259,23 +276,21 @@ func (runner *ContainerRunner) LoadImage() (string, error) {
                return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
        }
        imageID := tarfiles[0][:len(tarfiles[0])-4]
-       imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0]
+       imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
        runner.CrunchLog.Printf("Using Docker image id %q", imageID)
 
-       if !runner.executor.ImageLoaded(imageID) {
-               runner.CrunchLog.Print("Loading Docker image from keep")
-               err = runner.executor.LoadImage(imageFile)
-               if err != nil {
-                       return "", err
-               }
-       } else {
-               runner.CrunchLog.Print("Docker image is available")
+       runner.CrunchLog.Print("Loading Docker image from keep")
+       err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
+               runner.containerClient)
+       if err != nil {
+               return "", err
        }
+
        return imageID, nil
 }
 
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
-       c = exec.Command("arv-mount", arvMountCmd...)
+func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
+       c = exec.Command(cmdline[0], cmdline[1:]...)
 
        // Copy our environment, but override ARVADOS_API_TOKEN with
        // the container auth token.
@@ -292,8 +307,21 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
                return nil, err
        }
        runner.arvMountLog = NewThrottledLogger(w)
+       scanner := logScanner{
+               Patterns: []string{
+                       "Keep write error",
+                       "Block not found error",
+                       "Unhandled exception during FUSE operation",
+               },
+               ReportFunc: func(pattern, text string) {
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":       "arv-mount: " + pattern,
+                               "warningDetail": text,
+                       })
+               },
+       }
        c.Stdout = runner.arvMountLog
-       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
+       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
 
        runner.CrunchLog.Printf("Running %v", c.Args)
 
@@ -393,14 +421,24 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{
+               "arv-mount",
                "--foreground",
-               "--allow-other",
                "--read-write",
                "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
-       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
-               arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+       if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
+               arvMountCmd = append(arvMountCmd, "--allow-other")
+       }
+
+       if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+               keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+               if err != nil {
+                       return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+               }
+               arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+       } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+               arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
        }
 
        collectionPaths := []string{}
@@ -429,8 +467,8 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        sort.Strings(binds)
 
        for _, bind := range binds {
-               mnt, ok := runner.Container.Mounts[bind]
-               if !ok {
+               mnt, notSecret := runner.Container.Mounts[bind]
+               if !notSecret {
                        mnt = runner.SecretMounts[bind]
                }
                if bind == "stdout" || bind == "stderr" {
@@ -499,8 +537,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                                }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
-                               arvMountCmd = append(arvMountCmd, "--mount-tmp")
-                               arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+                               arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
                                tmpcount++
                        }
                        if mnt.Writable {
@@ -560,9 +597,32 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        if err != nil {
                                return nil, fmt.Errorf("writing temp file: %v", err)
                        }
-                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
+                               // In most cases, if the container
+                               // specifies a literal file inside the
+                               // output path, we copy it into the
+                               // output directory (either a mounted
+                               // collection or a staging area on the
+                               // host fs). If it's a secret, it will
+                               // be skipped when copying output from
+                               // staging to Keep later.
                                copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                        } else {
+                               // If a secret is outside OutputPath,
+                               // we bind mount the secret file
+                               // directly just like other mounts. We
+                               // also use this strategy when a
+                               // secret is inside OutputPath but
+                               // OutputPath is a live collection, to
+                               // avoid writing the secret to
+                               // Keep. Attempting to remove a
+                               // bind-mounted secret file from
+                               // inside the container will return a
+                               // "Device or resource busy" error
+                               // that might not be handled well by
+                               // the container, which is why we
+                               // don't use this strategy when
+                               // OutputPath is a staging directory.
                                bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
 
@@ -571,7 +631,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        if err != nil {
                                return nil, fmt.Errorf("creating temp dir: %v", err)
                        }
-                       err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
+                       err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token)
                        if err != nil {
                                return nil, err
                        }
@@ -594,16 +654,25 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        }
 
        if pdhOnly {
-               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+               // If we are only mounting collections by pdh, make
+               // sure we don't subscribe to websocket events to
+               // avoid putting undesired load on the API server
+               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       // the by_uuid mount point is used by singularity when writing
+       // out docker images converted to SIF
+       arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -658,6 +727,7 @@ func (runner *ContainerRunner) stopHoststat() error {
                return nil
        }
        runner.hoststatReporter.Stop()
+       runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
        err := runner.hoststatLogger.Close()
        if err != nil {
                return fmt.Errorf("error closing hoststat logs: %v", err)
@@ -672,11 +742,20 @@ func (runner *ContainerRunner) startHoststat() error {
        }
        runner.hoststatLogger = NewThrottledLogger(w)
        runner.hoststatReporter = &crunchstat.Reporter{
-               Logger:     log.New(runner.hoststatLogger, "", 0),
-               CgroupRoot: runner.cgroupRoot,
+               Logger: log.New(runner.hoststatLogger, "", 0),
+               // Our own cgroup is the "host" cgroup, in the sense
+               // that it accounts for resource usage outside the
+               // container. It doesn't count _all_ resource usage on
+               // the system.
+               //
+               // TODO?: Use the furthest ancestor of our own cgroup
+               // that has stats available. (Currently crunchstat
+               // does not have that capability.)
+               Pid:        os.Getpid,
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -687,12 +766,15 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.executor.CgroupID(),
-               Logger:       log.New(runner.statLogger, "", 0),
-               CgroupParent: runner.expectCgroupParent,
-               CgroupRoot:   runner.cgroupRoot,
-               PollPeriod:   runner.statInterval,
-               TempDir:      runner.parentTemp,
+               Pid:    runner.executor.Pid,
+               FS:     runner.crunchstatFakeFS,
+               Logger: log.New(runner.statLogger, "", 0),
+               MemThresholds: map[string][]crunchstat.Threshold{
+                       "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+               },
+               PollPeriod:      runner.statInterval,
+               TempDir:         runner.parentTemp,
+               ThresholdLogger: runner.CrunchLog,
        }
        runner.statReporter.Start()
        return nil
@@ -877,7 +959,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
-       var stdin io.ReadCloser
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
        if mnt, ok := runner.Container.Mounts["stdin"]; ok {
                switch mnt.Kind {
                case "collection":
@@ -944,6 +1026,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                env["ARVADOS_API_TOKEN"] = tok
                env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
                env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+               env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES")
        }
        workdir := runner.Container.Cwd
        if workdir == "." {
@@ -954,20 +1037,29 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        if !runner.enableMemoryLimit {
                ram = 0
        }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
+
+       if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+               nvidiaModprobe(runner.CrunchLog)
+       }
+
        return runner.executor.Create(containerSpec{
-               Image:         imageID,
-               VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
-               RAM:           ram,
-               WorkingDir:    workdir,
-               Env:           env,
-               BindMounts:    bindmounts,
-               Command:       runner.Container.Command,
-               EnableNetwork: enableNetwork,
-               NetworkMode:   runner.networkMode,
-               CgroupParent:  runner.setCgroupParent,
-               Stdin:         stdin,
-               Stdout:        stdout,
-               Stderr:        stderr,
+               Image:           imageID,
+               VCPUs:           runner.Container.RuntimeConstraints.VCPUs,
+               RAM:             ram,
+               WorkingDir:      workdir,
+               Env:             env,
+               BindMounts:      bindmounts,
+               Command:         runner.Container.Command,
+               EnableNetwork:   enableNetwork,
+               CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
+               NetworkMode:     runner.networkMode,
+               CgroupParent:    runner.setCgroupParent,
+               Stdin:           stdin,
+               Stdout:          stdout,
+               Stderr:          stderr,
        })
 }
 
@@ -1018,14 +1110,59 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       extra := ""
+       if exitcode&0x80 != 0 {
+               // Convert raw exit status (0x80 + signal number) to a
+               // string to log after the code, like " (signal 101)"
+               // or " (signal 9, killed)"
+               sig := syscall.WaitStatus(exitcode).Signal()
+               if name := unix.SignalName(sig); name != "" {
+                       extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
+               } else {
+                       extra = fmt.Sprintf(" (signal %d)", sig)
+               }
+       }
+       runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+       err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select":    []string{"uuid"},
+               "container": arvadosclient.Dict{"exit_code": exitcode},
+       }, nil)
+       if err != nil {
+               runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err)
+       }
+
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+
        if runner.statReporter != nil {
                runner.statReporter.Stop()
+               runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+                       "rss": runner.Container.RuntimeConstraints.RAM,
+               })
                err = runner.statLogger.Close()
                if err != nil {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
-       return nil
+       return returnErr
 }
 
 func (runner *ContainerRunner) updateLogs() {
@@ -1063,10 +1200,12 @@ func (runner *ContainerRunner) updateLogs() {
                        continue
                }
 
-               var updated arvados.Container
                err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"log": saved.PortableDataHash},
-               }, &updated)
+                       "select": []string{"uuid"},
+                       "container": arvadosclient.Dict{
+                               "log": saved.PortableDataHash,
+                       },
+               }, nil)
                if err != nil {
                        runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
                        continue
@@ -1076,6 +1215,121 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+       type ec2metadata struct {
+               Action string    `json:"action"`
+               Time   time.Time `json:"time"`
+       }
+       runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       var metadata ec2metadata
+       var token string
+       var tokenExp time.Time
+       check := func() error {
+               ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+               defer cancel()
+               if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+                       req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+                       if err != nil {
+                               return err
+                       }
+                       req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+                       resp, err := http.DefaultClient.Do(req)
+                       if err != nil {
+                               return err
+                       }
+                       defer resp.Body.Close()
+                       if resp.StatusCode != http.StatusOK {
+                               return fmt.Errorf("%s", resp.Status)
+                       }
+                       newtoken, err := ioutil.ReadAll(resp.Body)
+                       if err != nil {
+                               return err
+                       }
+                       token = strings.TrimSpace(string(newtoken))
+                       tokenExp = time.Now().Add(ec2TokenTTL)
+               }
+               req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+               if err != nil {
+                       return err
+               }
+               req.Header.Set("X-aws-ec2-metadata-token", token)
+               resp, err := http.DefaultClient.Do(req)
+               if err != nil {
+                       return err
+               }
+               defer resp.Body.Close()
+               metadata = ec2metadata{}
+               switch resp.StatusCode {
+               case http.StatusOK:
+                       break
+               case http.StatusNotFound:
+                       // "If Amazon EC2 is not preparing to stop or
+                       // terminate the instance, or if you
+                       // terminated the instance yourself,
+                       // instance-action is not present in the
+                       // instance metadata and you receive an HTTP
+                       // 404 error when you try to retrieve it."
+                       return nil
+               case http.StatusUnauthorized:
+                       token = ""
+                       return fmt.Errorf("%s", resp.Status)
+               default:
+                       return fmt.Errorf("%s", resp.Status)
+               }
+               err = json.NewDecoder(resp.Body).Decode(&metadata)
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+       failures := 0
+       var lastmetadata ec2metadata
+       for range time.NewTicker(spotInterruptionCheckInterval).C {
+               err := check()
+               if err != nil {
+                       runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+                       failures++
+                       if failures > 5 {
+                               runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+                               return
+                       }
+                       continue
+               }
+               failures = 0
+               if metadata != lastmetadata {
+                       lastmetadata = metadata
+                       text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       runner.CrunchLog.Printf("%s", text)
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":          "preemption notice",
+                               "warningDetail":    text,
+                               "preemptionNotice": text,
+                       })
+                       if proc, err := os.FindProcess(os.Getpid()); err == nil {
+                               // trigger updateLogs
+                               proc.Signal(syscall.SIGUSR1)
+                       }
+               }
+       }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
+       err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select": []string{"uuid"},
+               "container": arvadosclient.Dict{
+                       "runtime_status": status,
+               },
+       }, nil)
+       if err != nil {
+               runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
+       }
+}
+
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
 func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
@@ -1083,7 +1337,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
                // Output may have been set directly by the container, so
                // refresh the container record to check.
                err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
-                       nil, &runner.Container)
+                       arvadosclient.Dict{
+                               "select": []string{"output"},
+                       }, &runner.Container)
                if err != nil {
                        return err
                }
@@ -1096,7 +1352,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
 
        txt, err := (&copier{
                client:        runner.containerClient,
-               arvClient:     runner.ContainerArvClient,
                keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
@@ -1122,6 +1377,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
        var resp arvados.Collection
        err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
                "ensure_unique_name": true,
+               "select":             []string{"portable_data_hash"},
                "collection": arvadosclient.Dict{
                        "is_trashed":    true,
                        "name":          "output for " + runner.Container.UUID,
@@ -1146,6 +1402,7 @@ func (runner *ContainerRunner) CleanupDirs() {
 
                if umnterr != nil {
                        runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+                       runner.ArvMount.Process.Kill()
                } else {
                        // If arv-mount --unmount gets stuck for any reason, we
                        // don't want to wait for it forever.  Do Wait() in a goroutine
@@ -1176,12 +1433,14 @@ func (runner *ContainerRunner) CleanupDirs() {
                                }
                        }
                }
+               runner.ArvMount = nil
        }
 
        if runner.ArvMountPoint != "" {
                if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
                        runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
                }
+               runner.ArvMountPoint = ""
        }
 
        if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
@@ -1216,6 +1475,16 @@ func (runner *ContainerRunner) CommitLogs() error {
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
+       if runner.keepstoreLogger != nil {
+               // Flush any buffered logs from our local keepstore
+               // process.  Discard anything logged after this point
+               // -- it won't end up in the log collection, so
+               // there's no point writing it to the collectionfs.
+               runner.keepstoreLogbuf.SetWriter(io.Discard)
+               runner.keepstoreLogger.Close()
+               runner.keepstoreLogger = nil
+       }
+
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
                // we must be closing the re-opened log, which won't
@@ -1224,6 +1493,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // -- it exists only to send logs to other channels.
                return nil
        }
+
        saved, err := runner.saveLogCollection(true)
        if err != nil {
                return fmt.Errorf("error saving log collection: %s", err)
@@ -1234,6 +1504,8 @@ func (runner *ContainerRunner) CommitLogs() error {
        return nil
 }
 
+// Create/update the log collection. Return value has UUID and
+// PortableDataHash fields populated, but others may be blank.
 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
        runner.logMtx.Lock()
        defer runner.logMtx.Unlock()
@@ -1258,11 +1530,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
        if final {
                updates["is_trashed"] = true
        } else {
-               exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+               // We set trash_at so this collection gets
+               // automatically cleaned up eventually.  It used to be
+               // 12 hours but we had a situation where the API
+               // server was down over a weekend but the containers
+               // kept running such that the log collection got
+               // trashed, so now we make it 2 weeks.  refs #20378
+               exp := time.Now().Add(time.Duration(24*14) * time.Hour)
                updates["trash_at"] = exp
                updates["delete_at"] = exp
        }
-       reqBody := arvadosclient.Dict{"collection": updates}
+       reqBody := arvadosclient.Dict{
+               "select":     []string{"uuid", "portable_data_hash"},
+               "collection": updates,
+       }
        var err2 error
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
@@ -1281,14 +1562,28 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
 }
 
 // UpdateContainerRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRunning() error {
+func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
        if runner.cCancelled {
                return ErrCancelled
        }
-       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
-               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
+       updates := arvadosclient.Dict{
+               "gateway_address": runner.gateway.Address,
+               "state":           "Running",
+       }
+       if logId != "" {
+               updates["log"] = logId
+       }
+       return runner.DispatcherArvClient.Update(
+               "containers",
+               runner.Container.UUID,
+               arvadosclient.Dict{
+                       "select":    []string{"uuid"},
+                       "container": updates,
+               },
+               nil,
+       )
 }
 
 // ContainerToken returns the api_token the container (and any
@@ -1315,15 +1610,19 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.LogsPDH != nil {
                update["log"] = *runner.LogsPDH
        }
-       if runner.finalState == "Complete" {
-               if runner.ExitCode != nil {
-                       update["exit_code"] = *runner.ExitCode
-               }
-               if runner.OutputPDH != nil {
-                       update["output"] = *runner.OutputPDH
-               }
+       if runner.ExitCode != nil {
+               update["exit_code"] = *runner.ExitCode
+       } else {
+               update["exit_code"] = nil
+       }
+       if runner.finalState == "Complete" && runner.OutputPDH != nil {
+               update["output"] = *runner.OutputPDH
        }
-       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+       update["cost"] = runner.calculateCost(time.Now())
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select":    []string{"uuid"},
+               "container": update,
+       }, nil)
 }
 
 // IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -1350,7 +1649,12 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
-       runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
+       runner.CrunchLog.Printf("%s", currentUserAndGroups())
+       v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+       runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+       runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+       runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1359,6 +1663,12 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Printf("Executing on host '%s'", hostname)
        }
 
+       sigusr2 := make(chan os.Signal, 1)
+       signal.Notify(sigusr2, syscall.SIGUSR2)
+       defer signal.Stop(sigusr2)
+       runner.loadPrices()
+       go runner.handleSIGUSR2(sigusr2)
+
        runner.finalState = "Queued"
 
        defer func() {
@@ -1416,6 +1726,7 @@ func (runner *ContainerRunner) Run() (err error) {
                }
                checkErr("stopHoststat", runner.stopHoststat())
                checkErr("CommitLogs", runner.CommitLogs())
+               runner.CleanupDirs()
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
@@ -1424,6 +1735,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1466,7 +1780,14 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       err = runner.UpdateContainerRunning()
+       logCollection, err := runner.saveLogCollection(false)
+       var logId string
+       if err == nil {
+               logId = logCollection.PortableDataHash
+       } else {
+               runner.CrunchLog.Printf("Error committing initial log collection: %v", err)
+       }
+       err = runner.UpdateContainerRunning(logId)
        if err != nil {
                return
        }
@@ -1585,23 +1906,27 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
-       cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
-       cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
-       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+       flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)")
+       flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)")
+       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup)")
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
-       stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+       configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
-       list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+       list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)")
        enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
        enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
        networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+       brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1615,40 +1940,58 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                ignoreDetachFlag = true
        }
 
-       if err := flags.Parse(args); err == flag.ErrHelp {
+       if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+               return code
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
                return 0
-       } else if err != nil {
-               log.Print(err)
-               return 1
-       }
-
-       if *stdinEnv && !ignoreDetachFlag {
-               // Load env vars on stdin if asked (but not in a
-               // detached child process, in which case stdin is
-               // /dev/null).
-               err := loadEnv(os.Stdin)
-               if err != nil {
-                       log.Print(err)
-                       return 1
-               }
+       } else if !*list && flags.NArg() != 1 {
+               fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+               return 2
        }
 
        containerUUID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, stdin, stdout, stderr)
        case *kill >= 0:
-               return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+               return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr)
        case *list:
-               return ListProcesses(os.Stdout, os.Stderr)
+               return ListProcesses(stdin, stdout, stderr)
        }
 
-       if containerUUID == "" {
+       if len(containerUUID) != 27 {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
 
+       var keepstoreLogbuf bufThenWrite
+       var conf ConfigData
+       if *stdinConfig {
+               err := json.NewDecoder(stdin).Decode(&conf)
+               if err != nil {
+                       log.Printf("decode stdin: %s", err)
+                       return 1
+               }
+               for k, v := range conf.Env {
+                       err = os.Setenv(k, v)
+                       if err != nil {
+                               log.Printf("setenv(%q): %s", k, err)
+                               return 1
+                       }
+               }
+               if conf.Cluster != nil {
+                       // ClusterID is missing from the JSON
+                       // representation, but we need it to generate
+                       // a valid config file for keepstore, so we
+                       // fill it using the container UUID prefix.
+                       conf.Cluster.ClusterID = containerUUID[:5]
+               }
+       } else {
+               conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
+       }
+
        log.Printf("crunch-run %s started", cmd.Version.String())
        time.Sleep(*sleep)
 
@@ -1656,16 +1999,27 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+       if err != nil {
+               log.Print(err)
+               return 1
+       }
+       if keepstore != nil {
+               defer keepstore.Process.Kill()
+       }
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("%s: %v", containerUUID, err)
                return 1
        }
-       api.Retries = 8
+       // arvadosclient now interprets Retries=10 to mean
+       // Timeout=10m, retrying with exponential backoff + jitter.
+       api.Retries = 10
 
-       kc, kcerr := keepclient.MakeKeepClient(api)
-       if kcerr != nil {
-               log.Printf("%s: %v", containerUUID, kcerr)
+       kc, err := keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1677,6 +2031,44 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
+       if keepstore == nil {
+               // Log explanation (if any) for why we're not running
+               // a local keepstore.
+               var buf bytes.Buffer
+               keepstoreLogbuf.SetWriter(&buf)
+               if buf.Len() > 0 {
+                       cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+               }
+       } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               keepstoreLogbuf.SetWriter(io.Discard)
+       } else {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               logwriter, err := cr.NewLogWriter("keepstore")
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+               var writer io.WriteCloser = cr.keepstoreLogger
+               if logWhat == "errors" {
+                       writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+               } else if logWhat != "all" {
+                       // should have been caught earlier by
+                       // dispatcher's config loader
+                       log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+                       return 1
+               }
+               err = keepstoreLogbuf.SetWriter(writer)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               cr.keepstoreLogbuf = &keepstoreLogbuf
+       }
+
        switch *runtimeEngine {
        case "docker":
                cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
@@ -1695,24 +2087,38 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        defer cr.executor.Close()
 
+       cr.brokenNodeHook = *brokenNodeHook
+
        gwAuthSecret := os.Getenv("GatewayAuthSecret")
        os.Unsetenv("GatewayAuthSecret")
        if gwAuthSecret == "" {
                // not safe to run a gateway service without an auth
                // secret
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
-       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
-               // dispatcher did not tell us which external IP
-               // address to advertise --> no gateway service
-               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
-       } else if de, ok := cr.executor.(*dockerExecutor); ok {
+       } else {
+               gwListen := os.Getenv("GatewayAddress")
                cr.gateway = Gateway{
-                       Address:            gwListen,
-                       AuthSecret:         gwAuthSecret,
-                       ContainerUUID:      containerUUID,
-                       DockerContainerID:  &de.containerID,
-                       Log:                cr.CrunchLog,
-                       ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+                       Address:       gwListen,
+                       AuthSecret:    gwAuthSecret,
+                       ContainerUUID: containerUUID,
+                       Target:        cr.executor,
+                       Log:           cr.CrunchLog,
+                       LogCollection: cr.LogCollection,
+               }
+               if gwListen == "" {
+                       // Direct connection won't work, so we use the
+                       // gateway_address field to indicate the
+                       // internalURL of the controller process that
+                       // has the current tunnel connection.
+                       cr.gateway.ArvadosClient = cr.dispatcherClient
+                       cr.gateway.UpdateTunnelURL = func(url string) {
+                               cr.gateway.Address = "tunnel " + url
+                               cr.DispatcherArvClient.Update("containers", containerUUID,
+                                       arvadosclient.Dict{
+                                               "select":    []string{"uuid"},
+                                               "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address},
+                                       }, nil)
+                       }
                }
                err = cr.gateway.Start()
                if err != nil {
@@ -1729,15 +2135,20 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
-       cr.cgroupRoot = *cgroupRoot
-       cr.expectCgroupParent = *cgroupParent
        cr.enableMemoryLimit = *enableMemoryLimit
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
-               p := findCgroup(*cgroupParentSubsystem)
+               p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem)
+               if err != nil {
+                       log.Printf("fatal: cgroup parent subsystem: %s", err)
+                       return 1
+               }
                cr.setCgroupParent = p
-               cr.expectCgroupParent = p
+       }
+
+       if conf.EC2SpotCheck {
+               go cr.checkSpotInterruptionNotices()
        }
 
        runerr := cr.Run()
@@ -1764,21 +2175,319 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
-func loadEnv(rdr io.Reader) error {
-       buf, err := ioutil.ReadAll(rdr)
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+       var conf ConfigData
+       conf.Cluster = loadClusterConfigFile(configFile, stderr)
+       if conf.Cluster == nil {
+               // skip loading the container record -- we won't be
+               // able to start local keepstore anyway.
+               return conf
+       }
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+               return conf
+       }
+       // arvadosclient now interprets Retries=10 to mean
+       // Timeout=10m, retrying with exponential backoff + jitter.
+       arv.Retries = 10
+       var ctr arvados.Container
+       err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+       if err != nil {
+               fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+               return conf
+       }
+       if ctr.RuntimeConstraints.VCPUs > 0 {
+               conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+       }
+       return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+       ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+       ldr.Path = path
+       cfg, err := ldr.Load()
+       if err != nil {
+               fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+               return nil
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+               return nil
+       }
+       fmt.Fprintf(stderr, "loaded config file %s\n", path)
+       return cluster
+}
+
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+       if configData.KeepBuffers < 1 {
+               fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+               return nil, nil
+       }
+       if configData.Cluster == nil {
+               fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
+               return nil, nil
+       }
+       for uuid, vol := range configData.Cluster.Volumes {
+               if len(vol.AccessViaHosts) > 0 {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+                       return nil, nil
+               }
+               if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+                       return nil, nil
+               }
+       }
+
+       // Rather than have an alternate way to tell keepstore how
+       // many buffers to use when starting it this way, we just
+       // modify the cluster configuration that we feed it on stdin.
+       configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+       localaddr := localKeepstoreAddr()
+       ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
+       if err != nil {
+               return nil, err
+       }
+       _, port, err := net.SplitHostPort(ln.Addr().String())
+       if err != nil {
+               ln.Close()
+               return nil, err
+       }
+       ln.Close()
+       url := "http://" + net.JoinHostPort(localaddr, port)
+
+       fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+       var confJSON bytes.Buffer
+       err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       configData.Cluster.ClusterID: *configData.Cluster,
+               },
+       })
        if err != nil {
-               return fmt.Errorf("read stdin: %s", err)
+               return nil, err
        }
-       var env map[string]string
-       err = json.Unmarshal(buf, &env)
+       cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+               // If we're a 'go test' process, running
+               // /proc/self/exe would start the test suite in a
+               // child process, which is not what we want.
+               cmd.Path, _ = exec.LookPath("go")
+               cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+               cmd.Env = os.Environ()
+       }
+       cmd.Stdin = &confJSON
+       cmd.Stdout = logbuf
+       cmd.Stderr = logbuf
+       cmd.Env = append(cmd.Env,
+               "GOGC=10",
+               "ARVADOS_SERVICE_INTERNAL_URL="+url)
+       err = cmd.Start()
        if err != nil {
-               return fmt.Errorf("decode stdin: %s", err)
+               return nil, fmt.Errorf("error starting keepstore process: %w", err)
        }
-       for k, v := range env {
-               err = os.Setenv(k, v)
+       cmdExited := false
+       go func() {
+               cmd.Wait()
+               cmdExited = true
+       }()
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+       defer cancel()
+       poll := time.NewTicker(time.Second / 10)
+       defer poll.Stop()
+       client := http.Client{}
+       for range poll.C {
+               testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+               testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
                if err != nil {
-                       return fmt.Errorf("setenv(%q): %s", k, err)
+                       return nil, err
+               }
+               resp, err := client.Do(testReq)
+               if err == nil {
+                       resp.Body.Close()
+                       if resp.StatusCode == http.StatusOK {
+                               break
+                       }
+               }
+               if cmdExited {
+                       return nil, fmt.Errorf("keepstore child process exited")
+               }
+               if ctx.Err() != nil {
+                       return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
                }
        }
-       return nil
+       os.Setenv("ARVADOS_KEEP_SERVICES", url)
+       return cmd, nil
+}
+
+// return current uid, gid, groups in a format suitable for logging:
+// "crunch-run process has uid=1234(arvados) gid=1234(arvados)
+// groups=1234(arvados),114(fuse)"
+func currentUserAndGroups() string {
+       u, err := user.Current()
+       if err != nil {
+               return fmt.Sprintf("error getting current user ID: %s", err)
+       }
+       s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid)
+       if g, err := user.LookupGroupId(u.Gid); err == nil {
+               s += fmt.Sprintf("(%s)", g.Name)
+       }
+       s += " groups="
+       if gids, err := u.GroupIds(); err == nil {
+               for i, gid := range gids {
+                       if i > 0 {
+                               s += ","
+                       }
+                       s += gid
+                       if g, err := user.LookupGroupId(gid); err == nil {
+                               s += fmt.Sprintf("(%s)", g.Name)
+                       }
+               }
+       }
+       return s
+}
+
+// Return a suitable local interface address for a local keepstore
+// service. Currently this is the numerically lowest non-loopback ipv4
+// address assigned to a local interface that is not in any of the
+// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
+func localKeepstoreAddr() string {
+       var ips []net.IP
+       // Ignore error (proceed with zero IPs)
+       addrs, _ := processIPs(os.Getpid())
+       for addr := range addrs {
+               ip := net.ParseIP(addr)
+               if ip == nil {
+                       // invalid
+                       continue
+               }
+               if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
+                       ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
+                       ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
+                       // unsuitable
+                       continue
+               }
+               ips = append(ips, ip)
+       }
+       if len(ips) == 0 {
+               return "0.0.0.0"
+       }
+       sort.Slice(ips, func(ii, jj int) bool {
+               i, j := ips[ii], ips[jj]
+               if len(i) != len(j) {
+                       return len(i) < len(j)
+               }
+               for x := range i {
+                       if i[x] != j[x] {
+                               return i[x] < j[x]
+                       }
+               }
+               return false
+       })
+       return ips[0].String()
+}
+
+func (cr *ContainerRunner) loadPrices() {
+       buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile))
+       if err != nil {
+               if !os.IsNotExist(err) {
+                       cr.CrunchLog.Printf("loadPrices: read: %s", err)
+               }
+               return
+       }
+       var prices []cloud.InstancePrice
+       err = json.Unmarshal(buf, &prices)
+       if err != nil {
+               cr.CrunchLog.Printf("loadPrices: decode: %s", err)
+               return
+       }
+       cr.pricesLock.Lock()
+       defer cr.pricesLock.Unlock()
+       var lastKnown time.Time
+       if len(cr.prices) > 0 {
+               lastKnown = cr.prices[0].StartTime
+       }
+       cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+       for i := len(cr.prices) - 1; i >= 0; i-- {
+               price := cr.prices[i]
+               if price.StartTime.After(lastKnown) {
+                       cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC())
+               }
+       }
+}
+
+func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
+       cr.pricesLock.Lock()
+       defer cr.pricesLock.Unlock()
+
+       // First, make a "prices" slice with the real data as far back
+       // as it goes, and (if needed) a "since the beginning of time"
+       // placeholder containing a reasonable guess about what the
+       // price was between cr.costStartTime and the earliest real
+       // data point.
+       prices := cr.prices
+       if len(prices) == 0 {
+               // use price info in InstanceType record initially
+               // provided by cloud dispatcher
+               var p float64
+               var it arvados.InstanceType
+               if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+                       p = it.Price
+               }
+               prices = []cloud.InstancePrice{{Price: p}}
+       } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) {
+               // guess earlier pricing was the same as the earliest
+               // price we know about
+               filler := prices[len(prices)-1]
+               filler.StartTime = time.Time{}
+               prices = append(prices, filler)
+       }
+
+       // Now that our history of price changes goes back at least as
+       // far as cr.costStartTime, add up the costs for each
+       // interval.
+       cost := 0.0
+       spanEnd := now
+       for _, ip := range prices {
+               spanStart := ip.StartTime
+               if spanStart.After(now) {
+                       // pricing information from the future -- not
+                       // expected from AWS, but possible in
+                       // principle, and exercised by tests.
+                       continue
+               }
+               last := false
+               if spanStart.Before(cr.costStartTime) {
+                       spanStart = cr.costStartTime
+                       last = true
+               }
+               cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600
+               if last {
+                       break
+               }
+               spanEnd = spanStart
+       }
+
+       return cost
+}
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+       for range sigchan {
+               runner.loadPrices()
+               update := arvadosclient.Dict{
+                       "select": []string{"uuid"},
+                       "container": arvadosclient.Dict{
+                               "cost": runner.calculateCost(time.Now()),
+                       },
+               }
+               runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+       }
 }