Merge branch '18600-copy-by-ref'
[arvados.git] / lib / crunchrun / crunchrun.go
index 7a2afeacca3272b07df5a1bad4ea1cad8b9b8755..4fa3f26ab54dc4fb5a0fba4cdc1a12fe4a3a94d0 100644 (file)
@@ -36,6 +36,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
+       "golang.org/x/sys/unix"
 )
 
 type command struct{}
@@ -137,6 +138,8 @@ type ContainerRunner struct {
        finalState    string
        parentTemp    string
 
+       keepstoreLogger  io.WriteCloser
+       keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -412,11 +415,14 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        arvMountCmd := []string{
                "arv-mount",
                "--foreground",
-               "--allow-other",
                "--read-write",
                "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
+       if runner.executor.Runtime() == "docker" {
+               arvMountCmd = append(arvMountCmd, "--allow-other")
+       }
+
        if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
                arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
        }
@@ -447,8 +453,8 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        sort.Strings(binds)
 
        for _, bind := range binds {
-               mnt, ok := runner.Container.Mounts[bind]
-               if !ok {
+               mnt, notSecret := runner.Container.Mounts[bind]
+               if !notSecret {
                        mnt = runner.SecretMounts[bind]
                }
                if bind == "stdout" || bind == "stderr" {
@@ -517,8 +523,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                                }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
-                               arvMountCmd = append(arvMountCmd, "--mount-tmp")
-                               arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+                               arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
                                tmpcount++
                        }
                        if mnt.Writable {
@@ -578,9 +583,32 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        if err != nil {
                                return nil, fmt.Errorf("writing temp file: %v", err)
                        }
-                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
+                               // In most cases, if the container
+                               // specifies a literal file inside the
+                               // output path, we copy it into the
+                               // output directory (either a mounted
+                               // collection or a staging area on the
+                               // host fs). If it's a secret, it will
+                               // be skipped when copying output from
+                               // staging to Keep later.
                                copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                        } else {
+                               // If a secret is outside OutputPath,
+                               // we bind mount the secret file
+                               // directly just like other mounts. We
+                               // also use this strategy when a
+                               // secret is inside OutputPath but
+                               // OutputPath is a live collection, to
+                               // avoid writing the secret to
+                               // Keep. Attempting to remove a
+                               // bind-mounted secret file from
+                               // inside the container will return a
+                               // "Device or resource busy" error
+                               // that might not be handled well by
+                               // the container, which is why we
+                               // don't use this strategy when
+                               // OutputPath is a staging directory.
                                bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
 
@@ -612,10 +640,15 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        }
 
        if pdhOnly {
-               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+               // If we are only mounting collections by pdh, make
+               // sure we don't subscribe to websocket events to
+               // avoid putting undesired load on the API server
+               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       // the by_uuid mount point is used by singularity when writing
+       // out docker images converted to SIF
        arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
@@ -976,20 +1009,26 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        runner.executorStdin = stdin
        runner.executorStdout = stdout
        runner.executorStderr = stderr
+
+       if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+               nvidiaModprobe(runner.CrunchLog)
+       }
+
        return runner.executor.Create(containerSpec{
-               Image:         imageID,
-               VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
-               RAM:           ram,
-               WorkingDir:    workdir,
-               Env:           env,
-               BindMounts:    bindmounts,
-               Command:       runner.Container.Command,
-               EnableNetwork: enableNetwork,
-               NetworkMode:   runner.networkMode,
-               CgroupParent:  runner.setCgroupParent,
-               Stdin:         stdin,
-               Stdout:        stdout,
-               Stderr:        stderr,
+               Image:           imageID,
+               VCPUs:           runner.Container.RuntimeConstraints.VCPUs,
+               RAM:             ram,
+               WorkingDir:      workdir,
+               Env:             env,
+               BindMounts:      bindmounts,
+               Command:         runner.Container.Command,
+               EnableNetwork:   enableNetwork,
+               CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
+               NetworkMode:     runner.networkMode,
+               CgroupParent:    runner.setCgroupParent,
+               Stdin:           stdin,
+               Stdout:          stdout,
+               Stderr:          stderr,
        })
 }
 
@@ -1040,6 +1079,20 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       extra := ""
+       if exitcode&0x80 != 0 {
+               // Convert raw exit status (0x80 + signal number) to a
+               // string to log after the code, like " (signal 101)"
+               // or " (signal 9, killed)"
+               sig := syscall.WaitStatus(exitcode).Signal()
+               if name := unix.SignalName(sig); name != "" {
+                       extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
+               } else {
+                       extra = fmt.Sprintf(" (signal %d)", sig)
+               }
+       }
+       runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+
        var returnErr error
        if err = runner.executorStdin.Close(); err != nil {
                err = fmt.Errorf("error closing container stdin: %s", err)
@@ -1277,6 +1330,16 @@ func (runner *ContainerRunner) CommitLogs() error {
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
+       if runner.keepstoreLogger != nil {
+               // Flush any buffered logs from our local keepstore
+               // process.  Discard anything logged after this point
+               // -- it won't end up in the log collection, so
+               // there's no point writing it to the collectionfs.
+               runner.keepstoreLogbuf.SetWriter(io.Discard)
+               runner.keepstoreLogger.Close()
+               runner.keepstoreLogger = nil
+       }
+
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
                // we must be closing the re-opened log, which won't
@@ -1285,6 +1348,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // -- it exists only to send logs to other channels.
                return nil
        }
+
        saved, err := runner.saveLogCollection(true)
        if err != nil {
                return fmt.Errorf("error saving log collection: %s", err)
@@ -1647,6 +1711,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
        cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
@@ -1677,11 +1742,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                ignoreDetachFlag = true
        }
 
-       if err := flags.Parse(args); err == flag.ErrHelp {
-               return 0
-       } else if err != nil {
-               log.Print(err)
-               return 1
+       if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+               return code
+       } else if !*list && flags.NArg() != 1 {
+               fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+               return 2
        }
 
        containerUUID := flags.Arg(0)
@@ -1702,9 +1767,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        var conf ConfigData
        if *stdinConfig {
-               err := json.NewDecoder(os.Stdin).Decode(&conf)
+               err := json.NewDecoder(stdin).Decode(&conf)
                if err != nil {
-                       log.Print(err)
+                       log.Printf("decode stdin: %s", err)
                        return 1
                }
                for k, v := range conf.Env {
@@ -1730,8 +1795,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
-       var keepstoreLog bufThenWrite
-       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+       var keepstoreLogbuf bufThenWrite
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
        if err != nil {
                log.Print(err)
                return 1
@@ -1747,9 +1812,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        api.Retries = 8
 
-       kc, kcerr := keepclient.MakeKeepClient(api)
-       if kcerr != nil {
-               log.Printf("%s: %v", containerUUID, kcerr)
+       kc, err := keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1761,17 +1826,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
-       if keepstore != nil {
-               w, err := cr.NewLogWriter("keepstore")
+       if keepstore == nil {
+               // Log explanation (if any) for why we're not running
+               // a local keepstore.
+               var buf bytes.Buffer
+               keepstoreLogbuf.SetWriter(&buf)
+               if buf.Len() > 0 {
+                       cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+               }
+       } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               keepstoreLogbuf.SetWriter(io.Discard)
+       } else {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               logwriter, err := cr.NewLogWriter("keepstore")
                if err != nil {
                        log.Print(err)
                        return 1
                }
-               err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+               cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+               var writer io.WriteCloser = cr.keepstoreLogger
+               if logWhat == "errors" {
+                       writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+               } else if logWhat != "all" {
+                       // should have been caught earlier by
+                       // dispatcher's config loader
+                       log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+                       return 1
+               }
+               err = keepstoreLogbuf.SetWriter(writer)
                if err != nil {
                        log.Print(err)
                        return 1
                }
+               cr.keepstoreLogbuf = &keepstoreLogbuf
        }
 
        switch *runtimeEngine {
@@ -1865,6 +1954,16 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        if configData.Cluster == nil || configData.KeepBuffers < 1 {
                return nil, nil
        }
+       for uuid, vol := range configData.Cluster.Volumes {
+               if len(vol.AccessViaHosts) > 0 {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+                       return nil, nil
+               }
+               if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+                       return nil, nil
+               }
+       }
 
        // Rather than have an alternate way to tell keepstore how
        // many buffers to use when starting it this way, we just
@@ -1895,37 +1994,52 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
                return nil, err
        }
        cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+               // If we're a 'go test' process, running
+               // /proc/self/exe would start the test suite in a
+               // child process, which is not what we want.
+               cmd.Path, _ = exec.LookPath("go")
+               cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+               cmd.Env = os.Environ()
+       }
        cmd.Stdin = &confJSON
        cmd.Stdout = logbuf
        cmd.Stderr = logbuf
-       cmd.Env = []string{
+       cmd.Env = append(cmd.Env,
                "GOGC=10",
-               "ARVADOS_SERVICE_INTERNAL_URL=" + url,
-       }
+               "ARVADOS_SERVICE_INTERNAL_URL="+url)
        err = cmd.Start()
        if err != nil {
                return nil, fmt.Errorf("error starting keepstore process: %w", err)
        }
+       cmdExited := false
+       go func() {
+               cmd.Wait()
+               cmdExited = true
+       }()
        ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
        defer cancel()
        poll := time.NewTicker(time.Second / 10)
        defer poll.Stop()
        client := http.Client{}
        for range poll.C {
-               testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+               testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+               testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
                if err != nil {
                        return nil, err
                }
                resp, err := client.Do(testReq)
                if err == nil {
-                       // Success -- don't need to check the
-                       // response, we just need to know it's
-                       // accepting requests.
                        resp.Body.Close()
-                       break
+                       if resp.StatusCode == http.StatusOK {
+                               break
+                       }
+               }
+               if cmdExited {
+                       return nil, fmt.Errorf("keepstore child process exited")
                }
                if ctx.Err() != nil {
-                       return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request")
+                       return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
                }
        }
        os.Setenv("ARVADOS_KEEP_SERVICES", url)