Merge branch '19099-singularity-container-shell'
[arvados.git] / lib / crunchrun / crunchrun.go
index 8f3a30203911187c28b71c405a92caac8cab14e5..30871e734911ea2e56fdd7172ef261b65c726ff2 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os"
        "os/exec"
        "os/signal"
+       "os/user"
        "path"
        "path/filepath"
        "regexp"
@@ -31,11 +32,14 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/crunchstat"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
+       "golang.org/x/sys/unix"
 )
 
 type command struct{}
@@ -165,6 +169,7 @@ type ContainerRunner struct {
        enableMemoryLimit bool
        enableNetwork     string // one of "default" or "always"
        networkMode       string // "none", "host", or "" -- passed through to executor
+       brokenNodeHook    string // script to run if node appears to be broken
        arvMountLog       *ThrottledLogger
 
        containerWatchdogInterval time.Duration
@@ -208,10 +213,9 @@ var errorBlacklist = []string{
        "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
        "(?ms).*grpc: the connection is unavailable.*",
 }
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
-       if *brokenNodeHook == "" {
+       if runner.brokenNodeHook == "" {
                path := filepath.Join(lockdir, brokenfile)
                runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
                f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
@@ -221,9 +225,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() {
                }
                f.Close()
        } else {
-               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+               runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
                // run killme script
-               c := exec.Command(*brokenNodeHook)
+               c := exec.Command(runner.brokenNodeHook)
                c.Stdout = runner.CrunchLog
                c.Stderr = runner.CrunchLog
                err := c.Run()
@@ -418,7 +422,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
-       if runner.executor.Runtime() == "docker" {
+       if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
                arvMountCmd = append(arvMountCmd, "--allow-other")
        }
 
@@ -452,8 +456,8 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        sort.Strings(binds)
 
        for _, bind := range binds {
-               mnt, ok := runner.Container.Mounts[bind]
-               if !ok {
+               mnt, notSecret := runner.Container.Mounts[bind]
+               if !notSecret {
                        mnt = runner.SecretMounts[bind]
                }
                if bind == "stdout" || bind == "stderr" {
@@ -522,8 +526,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                                }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
-                               arvMountCmd = append(arvMountCmd, "--mount-tmp")
-                               arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+                               arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
                                tmpcount++
                        }
                        if mnt.Writable {
@@ -583,9 +586,32 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        if err != nil {
                                return nil, fmt.Errorf("writing temp file: %v", err)
                        }
-                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
+                               // In most cases, if the container
+                               // specifies a literal file inside the
+                               // output path, we copy it into the
+                               // output directory (either a mounted
+                               // collection or a staging area on the
+                               // host fs). If it's a secret, it will
+                               // be skipped when copying output from
+                               // staging to Keep later.
                                copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                        } else {
+                               // If a secret is outside OutputPath,
+                               // we bind mount the secret file
+                               // directly just like other mounts. We
+                               // also use this strategy when a
+                               // secret is inside OutputPath but
+                               // OutputPath is a live collection, to
+                               // avoid writing the secret to
+                               // Keep. Attempting to remove a
+                               // bind-mounted secret file from
+                               // inside the container will return a
+                               // "Device or resource busy" error
+                               // that might not be handled well by
+                               // the container, which is why we
+                               // don't use this strategy when
+                               // OutputPath is a staging directory.
                                bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
 
@@ -617,10 +643,15 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        }
 
        if pdhOnly {
-               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+               // If we are only mounting collections by pdh, make
+               // sure we don't subscribe to websocket events to
+               // avoid putting undesired load on the API server
+               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       // the by_uuid mount point is used by singularity when writing
+       // out docker images converted to SIF
        arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
@@ -981,20 +1012,26 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        runner.executorStdin = stdin
        runner.executorStdout = stdout
        runner.executorStderr = stderr
+
+       if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+               nvidiaModprobe(runner.CrunchLog)
+       }
+
        return runner.executor.Create(containerSpec{
-               Image:         imageID,
-               VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
-               RAM:           ram,
-               WorkingDir:    workdir,
-               Env:           env,
-               BindMounts:    bindmounts,
-               Command:       runner.Container.Command,
-               EnableNetwork: enableNetwork,
-               NetworkMode:   runner.networkMode,
-               CgroupParent:  runner.setCgroupParent,
-               Stdin:         stdin,
-               Stdout:        stdout,
-               Stderr:        stderr,
+               Image:           imageID,
+               VCPUs:           runner.Container.RuntimeConstraints.VCPUs,
+               RAM:             ram,
+               WorkingDir:      workdir,
+               Env:             env,
+               BindMounts:      bindmounts,
+               Command:         runner.Container.Command,
+               EnableNetwork:   enableNetwork,
+               CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
+               NetworkMode:     runner.networkMode,
+               CgroupParent:    runner.setCgroupParent,
+               Stdin:           stdin,
+               Stdout:          stdout,
+               Stderr:          stderr,
        })
 }
 
@@ -1045,6 +1082,20 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       extra := ""
+       if exitcode&0x80 != 0 {
+               // Convert raw exit status (0x80 + signal number) to a
+               // string to log after the code, like " (signal 101)"
+               // or " (signal 9, killed)"
+               sig := syscall.WaitStatus(exitcode).Signal()
+               if name := unix.SignalName(sig); name != "" {
+                       extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
+               } else {
+                       extra = fmt.Sprintf(" (signal %d)", sig)
+               }
+       }
+       runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+
        var returnErr error
        if err = runner.executorStdin.Close(); err != nil {
                err = fmt.Errorf("error closing container stdin: %s", err)
@@ -1427,7 +1478,11 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
-       runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
+       runner.CrunchLog.Printf("%s", currentUserAndGroups())
+       v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+       runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+       runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+       runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1672,6 +1727,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
        stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+       configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1680,6 +1736,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+       brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
        ignoreDetachFlag := false
@@ -1694,11 +1751,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                ignoreDetachFlag = true
        }
 
-       if err := flags.Parse(args); err == flag.ErrHelp {
-               return 0
-       } else if err != nil {
-               log.Print(err)
-               return 1
+       if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+               return code
+       } else if !*list && flags.NArg() != 1 {
+               fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+               return 2
        }
 
        containerUUID := flags.Arg(0)
@@ -1717,6 +1774,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       var keepstoreLogbuf bufThenWrite
        var conf ConfigData
        if *stdinConfig {
                err := json.NewDecoder(stdin).Decode(&conf)
@@ -1738,6 +1796,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        // fill it using the container UUID prefix.
                        conf.Cluster.ClusterID = containerUUID[:5]
                }
+       } else {
+               conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
        }
 
        log.Printf("crunch-run %s started", cmd.Version.String())
@@ -1747,7 +1807,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
-       var keepstoreLogbuf bufThenWrite
        keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
        if err != nil {
                log.Print(err)
@@ -1833,6 +1892,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        defer cr.executor.Close()
 
+       cr.brokenNodeHook = *brokenNodeHook
+
        gwAuthSecret := os.Getenv("GatewayAuthSecret")
        os.Unsetenv("GatewayAuthSecret")
        if gwAuthSecret == "" {
@@ -1843,14 +1904,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                // dispatcher did not tell us which external IP
                // address to advertise --> no gateway service
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
-       } else if de, ok := cr.executor.(*dockerExecutor); ok {
+       } else {
                cr.gateway = Gateway{
-                       Address:            gwListen,
-                       AuthSecret:         gwAuthSecret,
-                       ContainerUUID:      containerUUID,
-                       DockerContainerID:  &de.containerID,
-                       Log:                cr.CrunchLog,
-                       ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+                       Address:       gwListen,
+                       AuthSecret:    gwAuthSecret,
+                       ContainerUUID: containerUUID,
+                       Target:        cr.executor,
+                       Log:           cr.CrunchLog,
                }
                err = cr.gateway.Start()
                if err != nil {
@@ -1873,7 +1933,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
-               p := findCgroup(*cgroupParentSubsystem)
+               p, err := findCgroup(*cgroupParentSubsystem)
+               if err != nil {
+                       log.Printf("fatal: cgroup parent subsystem: %s", err)
+                       return 1
+               }
                cr.setCgroupParent = p
                cr.expectCgroupParent = p
        }
@@ -1902,8 +1966,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+       var conf ConfigData
+       conf.Cluster = loadClusterConfigFile(configFile, stderr)
+       if conf.Cluster == nil {
+               // skip loading the container record -- we won't be
+               // able to start local keepstore anyway.
+               return conf
+       }
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+               return conf
+       }
+       arv.Retries = 8
+       var ctr arvados.Container
+       err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+       if err != nil {
+               fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+               return conf
+       }
+       if ctr.RuntimeConstraints.VCPUs > 0 {
+               conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+       }
+       return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+       ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+       ldr.Path = path
+       cfg, err := ldr.Load()
+       if err != nil {
+               fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+               return nil
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+               return nil
+       }
+       fmt.Fprintf(stderr, "loaded config file %s\n", path)
+       return cluster
+}
+
 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
-       if configData.Cluster == nil || configData.KeepBuffers < 1 {
+       if configData.KeepBuffers < 1 {
+               fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+               return nil, nil
+       }
+       if configData.Cluster == nil {
+               fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
                return nil, nil
        }
        for uuid, vol := range configData.Cluster.Volumes {
@@ -1997,3 +2115,30 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        os.Setenv("ARVADOS_KEEP_SERVICES", url)
        return cmd, nil
 }
+
+// return current uid, gid, groups in a format suitable for logging:
+// "crunch-run process has uid=1234(arvados) gid=1234(arvados)
+// groups=1234(arvados),114(fuse)"
+func currentUserAndGroups() string {
+       u, err := user.Current()
+       if err != nil {
+               return fmt.Sprintf("error getting current user ID: %s", err)
+       }
+       s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid)
+       if g, err := user.LookupGroupId(u.Gid); err == nil {
+               s += fmt.Sprintf("(%s)", g.Name)
+       }
+       s += " groups="
+       if gids, err := u.GroupIds(); err == nil {
+               for i, gid := range gids {
+                       if i > 0 {
+                               s += ","
+                       }
+                       s += gid
+                       if g, err := user.LookupGroupId(gid); err == nil {
+                               s += fmt.Sprintf("(%s)", g.Name)
+                       }
+               }
+       }
+       return s
+}