Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / crunchrun / crunchrun.go
index f9bf3df9d7535fceb20231b8082c359745dc45e2..51e154c0ecfb3b978844947480f1efe7fe2f6fa9 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os"
        "os/exec"
        "os/signal"
+       "os/user"
        "path"
        "path/filepath"
        "regexp"
@@ -31,11 +32,14 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/crunchstat"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
+       "golang.org/x/sys/unix"
 )
 
 type command struct{}
@@ -136,7 +140,9 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -165,6 +171,7 @@ type ContainerRunner struct {
        enableMemoryLimit bool
        enableNetwork     string // one of "default" or "always"
        networkMode       string // "none", "host", or "" -- passed through to executor
+       brokenNodeHook    string // script to run if node appears to be broken
        arvMountLog       *ThrottledLogger
 
        containerWatchdogInterval time.Duration
@@ -208,10 +215,9 @@ var errorBlacklist = []string{
        "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
        "(?ms).*grpc: the connection is unavailable.*",
 }
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
-       if *brokenNodeHook == "" {
+       if runner.brokenNodeHook == "" {
                path := filepath.Join(lockdir, brokenfile)
                runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
                f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
@@ -221,9 +227,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() {
                }
                f.Close()
        } else {
-               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+               runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
                // run killme script
-               c := exec.Command(*brokenNodeHook)
+               c := exec.Command(runner.brokenNodeHook)
                c.Stdout = runner.CrunchLog
                c.Stderr = runner.CrunchLog
                err := c.Run()
@@ -418,11 +424,17 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
-       if runner.executor.Runtime() == "docker" {
+       if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
                arvMountCmd = append(arvMountCmd, "--allow-other")
        }
 
-       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+       if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+               keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+               if err != nil {
+                       return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+               }
+               arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+       } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
                arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
        }
 
@@ -452,8 +464,8 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        sort.Strings(binds)
 
        for _, bind := range binds {
-               mnt, ok := runner.Container.Mounts[bind]
-               if !ok {
+               mnt, notSecret := runner.Container.Mounts[bind]
+               if !notSecret {
                        mnt = runner.SecretMounts[bind]
                }
                if bind == "stdout" || bind == "stderr" {
@@ -522,8 +534,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                                }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
-                               arvMountCmd = append(arvMountCmd, "--mount-tmp")
-                               arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+                               arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
                                tmpcount++
                        }
                        if mnt.Writable {
@@ -583,9 +594,32 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        if err != nil {
                                return nil, fmt.Errorf("writing temp file: %v", err)
                        }
-                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
+                               // In most cases, if the container
+                               // specifies a literal file inside the
+                               // output path, we copy it into the
+                               // output directory (either a mounted
+                               // collection or a staging area on the
+                               // host fs). If it's a secret, it will
+                               // be skipped when copying output from
+                               // staging to Keep later.
                                copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                        } else {
+                               // If a secret is outside OutputPath,
+                               // we bind mount the secret file
+                               // directly just like other mounts. We
+                               // also use this strategy when a
+                               // secret is inside OutputPath but
+                               // OutputPath is a live collection, to
+                               // avoid writing the secret to
+                               // Keep. Attempting to remove a
+                               // bind-mounted secret file from
+                               // inside the container will return a
+                               // "Device or resource busy" error
+                               // that might not be handled well by
+                               // the container, which is why we
+                               // don't use this strategy when
+                               // OutputPath is a staging directory.
                                bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
 
@@ -617,10 +651,15 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        }
 
        if pdhOnly {
-               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+               // If we are only mounting collections by pdh, make
+               // sure we don't subscribe to websocket events to
+               // avoid putting undesired load on the API server
+               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       // the by_uuid mount point is used by singularity when writing
+       // out docker images converted to SIF
        arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
@@ -628,6 +667,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -701,6 +743,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -968,6 +1011,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                env["ARVADOS_API_TOKEN"] = tok
                env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
                env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+               env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES")
        }
        workdir := runner.Container.Cwd
        if workdir == "." {
@@ -981,20 +1025,26 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        runner.executorStdin = stdin
        runner.executorStdout = stdout
        runner.executorStderr = stderr
+
+       if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+               nvidiaModprobe(runner.CrunchLog)
+       }
+
        return runner.executor.Create(containerSpec{
-               Image:         imageID,
-               VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
-               RAM:           ram,
-               WorkingDir:    workdir,
-               Env:           env,
-               BindMounts:    bindmounts,
-               Command:       runner.Container.Command,
-               EnableNetwork: enableNetwork,
-               NetworkMode:   runner.networkMode,
-               CgroupParent:  runner.setCgroupParent,
-               Stdin:         stdin,
-               Stdout:        stdout,
-               Stderr:        stderr,
+               Image:           imageID,
+               VCPUs:           runner.Container.RuntimeConstraints.VCPUs,
+               RAM:             ram,
+               WorkingDir:      workdir,
+               Env:             env,
+               BindMounts:      bindmounts,
+               Command:         runner.Container.Command,
+               EnableNetwork:   enableNetwork,
+               CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
+               NetworkMode:     runner.networkMode,
+               CgroupParent:    runner.setCgroupParent,
+               Stdin:           stdin,
+               Stdout:          stdout,
+               Stderr:          stderr,
        })
 }
 
@@ -1045,6 +1095,26 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       extra := ""
+       if exitcode&0x80 != 0 {
+               // Convert raw exit status (0x80 + signal number) to a
+               // string to log after the code, like " (signal 101)"
+               // or " (signal 9, killed)"
+               sig := syscall.WaitStatus(exitcode).Signal()
+               if name := unix.SignalName(sig); name != "" {
+                       extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
+               } else {
+                       extra = fmt.Sprintf(" (signal %d)", sig)
+               }
+       }
+       runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+       err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "container": arvadosclient.Dict{"exit_code": exitcode},
+       }, nil)
+       if err != nil {
+               runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err)
+       }
+
        var returnErr error
        if err = runner.executorStdin.Close(); err != nil {
                err = fmt.Errorf("error closing container stdin: %s", err)
@@ -1111,10 +1181,9 @@ func (runner *ContainerRunner) updateLogs() {
                        continue
                }
 
-               var updated arvados.Container
                err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                        "container": arvadosclient.Dict{"log": saved.PortableDataHash},
-               }, &updated)
+               }, nil)
                if err != nil {
                        runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
                        continue
@@ -1392,13 +1461,17 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.LogsPDH != nil {
                update["log"] = *runner.LogsPDH
        }
-       if runner.finalState == "Complete" {
-               if runner.ExitCode != nil {
-                       update["exit_code"] = *runner.ExitCode
-               }
-               if runner.OutputPDH != nil {
-                       update["output"] = *runner.OutputPDH
-               }
+       if runner.ExitCode != nil {
+               update["exit_code"] = *runner.ExitCode
+       } else {
+               update["exit_code"] = nil
+       }
+       if runner.finalState == "Complete" && runner.OutputPDH != nil {
+               update["output"] = *runner.OutputPDH
+       }
+       var it arvados.InstanceType
+       if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+               update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
        }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
@@ -1427,7 +1500,12 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
-       runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
+       runner.CrunchLog.Printf("%s", currentUserAndGroups())
+       v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+       runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+       runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+       runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1502,6 +1580,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1672,6 +1753,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
        stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+       configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1680,7 +1762,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+       brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1696,7 +1780,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
                return code
-       } else if flags.NArg() != 1 {
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
+               return 0
+       } else if !*list && flags.NArg() != 1 {
                fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
                return 2
        }
@@ -1717,6 +1804,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       var keepstoreLogbuf bufThenWrite
        var conf ConfigData
        if *stdinConfig {
                err := json.NewDecoder(stdin).Decode(&conf)
@@ -1738,6 +1826,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        // fill it using the container UUID prefix.
                        conf.Cluster.ClusterID = containerUUID[:5]
                }
+       } else {
+               conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
        }
 
        log.Printf("crunch-run %s started", cmd.Version.String())
@@ -1747,7 +1837,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
-       var keepstoreLogbuf bufThenWrite
        keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
        if err != nil {
                log.Print(err)
@@ -1778,6 +1867,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
@@ -1833,24 +1923,34 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        defer cr.executor.Close()
 
+       cr.brokenNodeHook = *brokenNodeHook
+
        gwAuthSecret := os.Getenv("GatewayAuthSecret")
        os.Unsetenv("GatewayAuthSecret")
        if gwAuthSecret == "" {
                // not safe to run a gateway service without an auth
                // secret
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
-       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
-               // dispatcher did not tell us which external IP
-               // address to advertise --> no gateway service
-               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
-       } else if de, ok := cr.executor.(*dockerExecutor); ok {
+       } else {
+               gwListen := os.Getenv("GatewayAddress")
                cr.gateway = Gateway{
-                       Address:            gwListen,
-                       AuthSecret:         gwAuthSecret,
-                       ContainerUUID:      containerUUID,
-                       DockerContainerID:  &de.containerID,
-                       Log:                cr.CrunchLog,
-                       ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+                       Address:       gwListen,
+                       AuthSecret:    gwAuthSecret,
+                       ContainerUUID: containerUUID,
+                       Target:        cr.executor,
+                       Log:           cr.CrunchLog,
+               }
+               if gwListen == "" {
+                       // Direct connection won't work, so we use the
+                       // gateway_address field to indicate the
+                       // internalURL of the controller process that
+                       // has the current tunnel connection.
+                       cr.gateway.ArvadosClient = cr.dispatcherClient
+                       cr.gateway.UpdateTunnelURL = func(url string) {
+                               cr.gateway.Address = "tunnel " + url
+                               cr.DispatcherArvClient.Update("containers", containerUUID,
+                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                       }
                }
                err = cr.gateway.Start()
                if err != nil {
@@ -1873,7 +1973,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
-               p := findCgroup(*cgroupParentSubsystem)
+               p, err := findCgroup(*cgroupParentSubsystem)
+               if err != nil {
+                       log.Printf("fatal: cgroup parent subsystem: %s", err)
+                       return 1
+               }
                cr.setCgroupParent = p
                cr.expectCgroupParent = p
        }
@@ -1902,8 +2006,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+       var conf ConfigData
+       conf.Cluster = loadClusterConfigFile(configFile, stderr)
+       if conf.Cluster == nil {
+               // skip loading the container record -- we won't be
+               // able to start local keepstore anyway.
+               return conf
+       }
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+               return conf
+       }
+       arv.Retries = 8
+       var ctr arvados.Container
+       err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+       if err != nil {
+               fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+               return conf
+       }
+       if ctr.RuntimeConstraints.VCPUs > 0 {
+               conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+       }
+       return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+       ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+       ldr.Path = path
+       cfg, err := ldr.Load()
+       if err != nil {
+               fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+               return nil
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+               return nil
+       }
+       fmt.Fprintf(stderr, "loaded config file %s\n", path)
+       return cluster
+}
+
 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
-       if configData.Cluster == nil || configData.KeepBuffers < 1 {
+       if configData.KeepBuffers < 1 {
+               fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+               return nil, nil
+       }
+       if configData.Cluster == nil {
+               fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
                return nil, nil
        }
        for uuid, vol := range configData.Cluster.Volumes {
@@ -1922,7 +2080,8 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        // modify the cluster configuration that we feed it on stdin.
        configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
 
-       ln, err := net.Listen("tcp", "localhost:0")
+       localaddr := localKeepstoreAddr()
+       ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
        if err != nil {
                return nil, err
        }
@@ -1932,7 +2091,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
                return nil, err
        }
        ln.Close()
-       url := "http://localhost:" + port
+       url := "http://" + net.JoinHostPort(localaddr, port)
 
        fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
 
@@ -1997,3 +2156,70 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        os.Setenv("ARVADOS_KEEP_SERVICES", url)
        return cmd, nil
 }
+
+// return current uid, gid, groups in a format suitable for logging:
+// "crunch-run process has uid=1234(arvados) gid=1234(arvados)
+// groups=1234(arvados),114(fuse)"
+func currentUserAndGroups() string {
+       u, err := user.Current()
+       if err != nil {
+               return fmt.Sprintf("error getting current user ID: %s", err)
+       }
+       s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid)
+       if g, err := user.LookupGroupId(u.Gid); err == nil {
+               s += fmt.Sprintf("(%s)", g.Name)
+       }
+       s += " groups="
+       if gids, err := u.GroupIds(); err == nil {
+               for i, gid := range gids {
+                       if i > 0 {
+                               s += ","
+                       }
+                       s += gid
+                       if g, err := user.LookupGroupId(gid); err == nil {
+                               s += fmt.Sprintf("(%s)", g.Name)
+                       }
+               }
+       }
+       return s
+}
+
+// Return a suitable local interface address for a local keepstore
+// service. Currently this is the numerically lowest non-loopback ipv4
+// address assigned to a local interface that is not in any of the
+// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
+func localKeepstoreAddr() string {
+       var ips []net.IP
+       // Ignore error (proceed with zero IPs)
+       addrs, _ := processIPs(os.Getpid())
+       for addr := range addrs {
+               ip := net.ParseIP(addr)
+               if ip == nil {
+                       // invalid
+                       continue
+               }
+               if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
+                       ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
+                       ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
+                       // unsuitable
+                       continue
+               }
+               ips = append(ips, ip)
+       }
+       if len(ips) == 0 {
+               return "0.0.0.0"
+       }
+       sort.Slice(ips, func(ii, jj int) bool {
+               i, j := ips[ii], ips[jj]
+               if len(i) != len(j) {
+                       return len(i) < len(j)
+               }
+               for x := range i {
+                       if i[x] != j[x] {
+                               return i[x] < j[x]
+                       }
+               }
+               return false
+       })
+       return ips[0].String()
+}