Merge branch '21535-multi-wf-delete'
[arvados.git] / lib / crunchrun / crunchrun.go
index 3f254496ba488bb3aea553b65d50937e59652096..8627045411a56dba53a61ba01c3b9e22d721812c 100644 (file)
@@ -12,6 +12,7 @@ import (
        "flag"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "log"
        "net"
@@ -45,6 +46,8 @@ import (
 
 type command struct{}
 
+var arvadosCertPath = "/etc/arvados/ca-certificates.crt"
+
 var Command = command{}
 
 // ConfigData contains environment variables and (when needed) cluster
@@ -75,13 +78,9 @@ type IKeepClient interface {
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
-       ClearBlockCache()
        SetStorageClasses(sc []string)
 }
 
-// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) (io.WriteCloser, error)
-
 type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
@@ -123,8 +122,7 @@ type ContainerRunner struct {
        Container     arvados.Container
        token         string
        ExitCode      *int
-       NewLogWriter  NewLogWriter
-       CrunchLog     *ThrottledLogger
+       CrunchLog     *logWriter
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -152,20 +150,12 @@ type ContainerRunner struct {
        hoststatLogger   io.WriteCloser
        hoststatReporter *crunchstat.Reporter
        statInterval     time.Duration
-       cgroupRoot       string
-       // What we expect the container's cgroup parent to be.
-       expectCgroupParent string
        // What we tell docker to use as the container's cgroup
-       // parent. Note: Ideally we would use the same field for both
-       // expectCgroupParent and setCgroupParent, and just make it
-       // default to "docker". However, when using docker < 1.10 with
-       // systemd, specifying a non-empty cgroup parent (even the
-       // default value "docker") hits a docker bug
-       // (https://github.com/docker/docker/issues/17126). Using two
-       // separate fields makes it possible to use the "expect cgroup
-       // parent to be X" feature even on sites where the "specify
-       // cgroup parent" feature breaks.
+       // parent.
        setCgroupParent string
+       // Fake root dir where crunchstat.Reporter should read OS
+       // files, for testing.
+       crunchstatFakeFS fs.FS
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
@@ -174,7 +164,7 @@ type ContainerRunner struct {
        enableNetwork     string // one of "default" or "always"
        networkMode       string // "none", "host", or "" -- passed through to executor
        brokenNodeHook    string // script to run if node appears to be broken
-       arvMountLog       *ThrottledLogger
+       arvMountLog       io.WriteCloser
 
        containerWatchdogInterval time.Duration
 
@@ -309,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
        }
        c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-       w, err := runner.NewLogWriter("arv-mount")
+       runner.arvMountLog, err = runner.openLogFile("arv-mount")
        if err != nil {
                return nil, err
        }
-       runner.arvMountLog = NewThrottledLogger(w)
        scanner := logScanner{
                Patterns: []string{
                        "Keep write error",
@@ -501,7 +490,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        }
                }
 
-               if bind == "/etc/arvados/ca-certificates.crt" {
+               if bind == arvadosCertPath {
                        needCertMount = false
                }
 
@@ -632,17 +621,6 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                                // OutputPath is a staging directory.
                                bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
-
-               case mnt.Kind == "git_tree":
-                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
-                       if err != nil {
-                               return nil, fmt.Errorf("creating temp dir: %v", err)
-                       }
-                       err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
-                       if err != nil {
-                               return nil, err
-                       }
-                       bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
                }
        }
 
@@ -651,10 +629,19 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        }
 
        if needCertMount && runner.Container.RuntimeConstraints.API {
-               for _, certfile := range arvadosclient.CertFiles {
-                       _, err := os.Stat(certfile)
-                       if err == nil {
-                               bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
+               for _, certfile := range []string{
+                       // Populated by caller, or sdk/go/arvados init(), or test suite:
+                       os.Getenv("SSL_CERT_FILE"),
+                       // Copied from Go 1.21 stdlib (src/crypto/x509/root_linux.go):
+                       "/etc/ssl/certs/ca-certificates.crt",                // Debian/Ubuntu/Gentoo etc.
+                       "/etc/pki/tls/certs/ca-bundle.crt",                  // Fedora/RHEL 6
+                       "/etc/ssl/ca-bundle.pem",                            // OpenSUSE
+                       "/etc/pki/tls/cacert.pem",                           // OpenELEC
+                       "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7
+                       "/etc/ssl/cert.pem",                                 // Alpine Linux
+               } {
+                       if _, err := os.Stat(certfile); err == nil {
+                               bindmounts[arvadosCertPath] = bindmount{HostPath: certfile, ReadOnly: true}
                                break
                        }
                }
@@ -743,14 +730,22 @@ func (runner *ContainerRunner) stopHoststat() error {
 }
 
 func (runner *ContainerRunner) startHoststat() error {
-       w, err := runner.NewLogWriter("hoststat")
+       var err error
+       runner.hoststatLogger, err = runner.openLogFile("hoststat")
        if err != nil {
                return err
        }
-       runner.hoststatLogger = NewThrottledLogger(w)
        runner.hoststatReporter = &crunchstat.Reporter{
-               Logger:     log.New(runner.hoststatLogger, "", 0),
-               CgroupRoot: runner.cgroupRoot,
+               Logger: newLogWriter(runner.hoststatLogger),
+               // Our own cgroup is the "host" cgroup, in the sense
+               // that it accounts for resource usage outside the
+               // container. It doesn't count _all_ resource usage on
+               // the system.
+               //
+               // TODO?: Use the furthest ancestor of our own cgroup
+               // that has stats available. (Currently crunchstat
+               // does not have that capability.)
+               Pid:        os.Getpid,
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
@@ -759,16 +754,15 @@ func (runner *ContainerRunner) startHoststat() error {
 }
 
 func (runner *ContainerRunner) startCrunchstat() error {
-       w, err := runner.NewLogWriter("crunchstat")
+       var err error
+       runner.statLogger, err = runner.openLogFile("crunchstat")
        if err != nil {
                return err
        }
-       runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CgroupParent: runner.expectCgroupParent,
-               CgroupRoot:   runner.cgroupRoot,
-               CID:          runner.executor.CgroupID(),
-               Logger:       log.New(runner.statLogger, "", 0),
+               Pid:    runner.executor.Pid,
+               FS:     runner.crunchstatFakeFS,
+               Logger: newLogWriter(runner.statLogger),
                MemThresholds: map[string][]crunchstat.Threshold{
                        "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
                },
@@ -791,7 +785,7 @@ type infoCommand struct {
 // might differ from what's described in the node record (see
 // LogNodeRecord).
 func (runner *ContainerRunner) LogHostInfo() (err error) {
-       w, err := runner.NewLogWriter("node-info")
+       w, err := runner.openLogFile("node-info")
        if err != nil {
                return
        }
@@ -842,62 +836,40 @@ func (runner *ContainerRunner) LogHostInfo() (err error) {
 
 // LogContainerRecord gets and saves the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() error {
-       logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
+       logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}})
        if !logged && err == nil {
                err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
        }
        return err
 }
 
-// LogNodeRecord logs the current host's InstanceType config entry (or
-// the arvados#node record, if running via crunch-dispatch-slurm).
+// LogNodeRecord logs the current host's InstanceType config entry, if
+// running via arvados-dispatch-cloud.
 func (runner *ContainerRunner) LogNodeRecord() error {
-       if it := os.Getenv("InstanceType"); it != "" {
-               // Dispatched via arvados-dispatch-cloud. Save
-               // InstanceType config fragment received from
-               // dispatcher on stdin.
-               w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
-               if err != nil {
-                       return err
-               }
-               defer w.Close()
-               _, err = io.WriteString(w, it)
-               if err != nil {
-                       return err
-               }
-               return w.Close()
+       it := os.Getenv("InstanceType")
+       if it == "" {
+               // Not dispatched by arvados-dispatch-cloud.
+               return nil
        }
-       // Dispatched via crunch-dispatch-slurm. Look up
-       // apiserver's node record corresponding to
-       // $SLURMD_NODENAME.
-       hostname := os.Getenv("SLURMD_NODENAME")
-       if hostname == "" {
-               hostname, _ = os.Hostname()
+       // Save InstanceType config fragment received from dispatcher
+       // on stdin.
+       w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+       if err != nil {
+               return err
        }
-       _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
-               // The "info" field has admin-only info when
-               // obtained with a privileged token, and
-               // should not be logged.
-               node, ok := resp.(map[string]interface{})
-               if ok {
-                       delete(node, "info")
-               }
-       })
-       return err
+       defer w.Close()
+       _, err = io.WriteString(w, it)
+       if err != nil {
+               return err
+       }
+       return w.Close()
 }
 
-func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
+func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}) (logged bool, err error) {
        writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
                return false, err
        }
-       w := &ArvLogWriter{
-               ArvClient:     runner.DispatcherArvClient,
-               UUID:          runner.Container.UUID,
-               loggingStream: label,
-               writeCloser:   writer,
-       }
-
        reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
        if err != nil {
                return false, fmt.Errorf("error getting %s record: %v", label, err)
@@ -916,16 +888,13 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
        } else if len(items) < 1 {
                return false, nil
        }
-       if munge != nil {
-               munge(items[0])
-       }
        // Re-encode it using indentation to improve readability
-       enc := json.NewEncoder(w)
+       enc := json.NewEncoder(writer)
        enc.SetIndent("", "    ")
        if err = enc.Encode(items[0]); err != nil {
                return false, fmt.Errorf("error logging %s record: %v", label, err)
        }
-       err = w.Close()
+       err = writer.Close()
        if err != nil {
                return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
        }
@@ -993,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                        return err
                }
                stdout = f
-       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+       } else if w, err := runner.openLogFile("stdout"); err != nil {
                return err
        } else {
-               stdout = NewThrottledLogger(w)
+               stdout = w
        }
 
        if mnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -1005,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                        return err
                }
                stderr = f
-       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+       } else if w, err := runner.openLogFile("stderr"); err != nil {
                return err
        } else {
-               stderr = NewThrottledLogger(w)
+               stderr = w
        }
 
        env := runner.Container.Environment
@@ -1124,6 +1093,7 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
        err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select":    []string{"uuid"},
                "container": arvadosclient.Dict{"exit_code": exitcode},
        }, nil)
        if err != nil {
@@ -1200,7 +1170,10 @@ func (runner *ContainerRunner) updateLogs() {
                }
 
                err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"log": saved.PortableDataHash},
+                       "select": []string{"uuid"},
+                       "container": arvadosclient.Dict{
+                               "log": saved.PortableDataHash,
+                       },
                }, nil)
                if err != nil {
                        runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
@@ -1316,6 +1289,7 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() {
 
 func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
        err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select": []string{"uuid"},
                "container": arvadosclient.Dict{
                        "runtime_status": status,
                },
@@ -1332,7 +1306,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
                // Output may have been set directly by the container, so
                // refresh the container record to check.
                err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
-                       nil, &runner.Container)
+                       arvadosclient.Dict{
+                               "select": []string{"output"},
+                       }, &runner.Container)
                if err != nil {
                        return err
                }
@@ -1345,10 +1321,10 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
 
        txt, err := (&copier{
                client:        runner.containerClient,
-               arvClient:     runner.ContainerArvClient,
                keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
+               globs:         runner.Container.OutputGlob,
                bindmounts:    bindmounts,
                mounts:        runner.Container.Mounts,
                secretMounts:  runner.SecretMounts,
@@ -1371,6 +1347,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
        var resp arvados.Collection
        err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
                "ensure_unique_name": true,
+               "select":             []string{"portable_data_hash"},
                "collection": arvadosclient.Dict{
                        "is_trashed":    true,
                        "name":          "output for " + runner.Container.UUID,
@@ -1453,19 +1430,11 @@ func (runner *ContainerRunner) CommitLogs() error {
                if runner.arvMountLog != nil {
                        runner.arvMountLog.Close()
                }
-               runner.CrunchLog.Close()
-
-               // Closing CrunchLog above allows them to be committed to Keep at this
-               // point, but re-open crunch log with ArvClient in case there are any
-               // other further errors (such as failing to write the log to Keep!)
-               // while shutting down
-               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
-                       ArvClient:     runner.DispatcherArvClient,
-                       UUID:          runner.Container.UUID,
-                       loggingStream: "crunch-run",
-                       writeCloser:   nil,
-               })
-               runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+
+               // From now on just log to stderr, in case there are
+               // any other further errors (such as failing to write
+               // the log to Keep!)  while shutting down
+               runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" ")))
        }()
 
        if runner.keepstoreLogger != nil {
@@ -1497,6 +1466,8 @@ func (runner *ContainerRunner) CommitLogs() error {
        return nil
 }
 
+// Create/update the log collection. Return value has UUID and
+// PortableDataHash fields populated, but others may be blank.
 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
        runner.logMtx.Lock()
        defer runner.logMtx.Unlock()
@@ -1521,11 +1492,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
        if final {
                updates["is_trashed"] = true
        } else {
-               exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+               // We set trash_at so this collection gets
+               // automatically cleaned up eventually.  It used to be
+               // 12 hours but we had a situation where the API
+               // server was down over a weekend but the containers
+               // kept running such that the log collection got
+               // trashed, so now we make it 2 weeks.  refs #20378
+               exp := time.Now().Add(time.Duration(24*14) * time.Hour)
                updates["trash_at"] = exp
                updates["delete_at"] = exp
        }
-       reqBody := arvadosclient.Dict{"collection": updates}
+       reqBody := arvadosclient.Dict{
+               "select":     []string{"uuid", "portable_data_hash"},
+               "collection": updates,
+       }
        var err2 error
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
@@ -1560,7 +1540,10 @@ func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
        return runner.DispatcherArvClient.Update(
                "containers",
                runner.Container.UUID,
-               arvadosclient.Dict{"container": updates},
+               arvadosclient.Dict{
+                       "select":    []string{"uuid"},
+                       "container": updates,
+               },
                nil,
        )
 }
@@ -1598,7 +1581,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
                update["output"] = *runner.OutputPDH
        }
        update["cost"] = runner.calculateCost(time.Now())
-       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select":    []string{"uuid"},
+               "container": update,
+       }, nil)
 }
 
 // IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -1608,18 +1594,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
        return runner.cCancelled
 }
 
-// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
-       writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
-       if err != nil {
-               return nil, err
-       }
-       return &ArvLogWriter{
-               ArvClient:     runner.DispatcherArvClient,
-               UUID:          runner.Container.UUID,
-               loggingStream: name,
-               writeCloser:   writer,
-       }, nil
+func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) {
+       return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
 }
 
 // Run the full container lifecycle.
@@ -1649,9 +1625,7 @@ func (runner *ContainerRunner) Run() (err error) {
 
        defer func() {
                runner.CleanupDirs()
-
                runner.CrunchLog.Printf("crunch-run finished")
-               runner.CrunchLog.Close()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1845,7 +1819,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
        }
-       cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
        cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1868,14 +1841,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
                return nil, err
        }
        cr.Container.UUID = containerUUID
-       w, err := cr.NewLogWriter("crunch-run")
+       f, err := cr.openLogFile("crunch-run")
        if err != nil {
                return nil, err
        }
-       cr.CrunchLog = NewThrottledLogger(w)
-       cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" "))))
 
-       loadLogThrottleParams(dispatcherArvClient)
        go cr.updateLogs()
 
        return cr, nil
@@ -1885,9 +1856,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
-       cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
-       cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
-       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+       flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)")
+       flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)")
+       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup); if empty, use the docker daemon's default cgroup parent. See https://doc.arvados.org/install/crunch2-slurm/install-dispatch.html#CrunchRunCommand-cgroups")
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
        stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
@@ -1972,7 +1943,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        time.Sleep(*sleep)
 
        if *caCertsPath != "" {
-               arvadosclient.CertFiles = []string{*caCertsPath}
+               os.Setenv("SSL_CERT_FILE", *caCertsPath)
        }
 
        keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
@@ -1989,14 +1960,15 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                log.Printf("%s: %v", containerUUID, err)
                return 1
        }
-       api.Retries = 8
+       // arvadosclient now interprets Retries=10 to mean
+       // Timeout=10m, retrying with exponential backoff + jitter.
+       api.Retries = 10
 
        kc, err := keepclient.MakeKeepClient(api)
        if err != nil {
                log.Printf("%s: %v", containerUUID, err)
                return 1
        }
-       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
        cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
@@ -2019,12 +1991,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                keepstoreLogbuf.SetWriter(io.Discard)
        } else {
                cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
-               logwriter, err := cr.NewLogWriter("keepstore")
+               cr.keepstoreLogger, err = cr.openLogFile("keepstore")
                if err != nil {
                        log.Print(err)
                        return 1
                }
-               cr.keepstoreLogger = NewThrottledLogger(logwriter)
 
                var writer io.WriteCloser = cr.keepstoreLogger
                if logWhat == "errors" {
@@ -2050,13 +2021,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
        default:
                cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
-               cr.CrunchLog.Close()
                return 1
        }
        if err != nil {
                cr.CrunchLog.Printf("%s: %v", containerUUID, err)
                cr.checkBrokenNode(err)
-               cr.CrunchLog.Close()
                return 1
        }
        defer cr.executor.Close()
@@ -2077,6 +2046,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        ContainerUUID: containerUUID,
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
+                       LogCollection: cr.LogCollection,
                }
                if gwListen == "" {
                        // Direct connection won't work, so we use the
@@ -2087,7 +2057,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        cr.gateway.UpdateTunnelURL = func(url string) {
                                cr.gateway.Address = "tunnel " + url
                                cr.DispatcherArvClient.Update("containers", containerUUID,
-                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                                       arvadosclient.Dict{
+                                               "select":    []string{"uuid"},
+                                               "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address},
+                                       }, nil)
                        }
                }
                err = cr.gateway.Start()
@@ -2105,19 +2078,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
-       cr.cgroupRoot = *cgroupRoot
-       cr.expectCgroupParent = *cgroupParent
        cr.enableMemoryLimit = *enableMemoryLimit
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
-               p, err := findCgroup(*cgroupParentSubsystem)
+               p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem)
                if err != nil {
                        log.Printf("fatal: cgroup parent subsystem: %s", err)
                        return 1
                }
                cr.setCgroupParent = p
-               cr.expectCgroupParent = p
        }
 
        if conf.EC2SpotCheck {
@@ -2165,7 +2135,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
                fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
                return conf
        }
-       arv.Retries = 8
+       // arvadosclient now interprets Retries=10 to mean
+       // Timeout=10m, retrying with exponential backoff + jitter.
+       arv.Retries = 10
        var ctr arvados.Container
        err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
        if err != nil {
@@ -2218,9 +2190,14 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        }
 
        // Rather than have an alternate way to tell keepstore how
-       // many buffers to use when starting it this way, we just
-       // modify the cluster configuration that we feed it on stdin.
-       configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+       // many buffers to use, etc., when starting it this way, we
+       // just modify the cluster configuration that we feed it on
+       // stdin.
+       ccfg := *configData.Cluster
+       ccfg.API.MaxKeepBlobBuffers = configData.KeepBuffers
+       ccfg.Collections.BlobTrash = false
+       ccfg.Collections.BlobTrashConcurrency = 0
+       ccfg.Collections.BlobDeleteConcurrency = 0
 
        localaddr := localKeepstoreAddr()
        ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
@@ -2240,7 +2217,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        var confJSON bytes.Buffer
        err = json.NewEncoder(&confJSON).Encode(arvados.Config{
                Clusters: map[string]arvados.Cluster{
-                       configData.Cluster.ClusterID: *configData.Cluster,
+                       ccfg.ClusterID: ccfg,
                },
        })
        if err != nil {
@@ -2454,6 +2431,7 @@ func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
        for range sigchan {
                runner.loadPrices()
                update := arvadosclient.Dict{
+                       "select": []string{"uuid"},
                        "container": arvadosclient.Dict{
                                "cost": runner.calculateCost(time.Now()),
                        },