X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5f95bfc2c5c7706c7961aeca3aabd90ea5661f0a..2f344e8b8dde661e74307ed7e561a758809382e1:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index d70dd1c428..7e68dcd331 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -137,6 +137,8 @@ type ContainerRunner struct { finalState string parentTemp string + keepstoreLogger io.WriteCloser + keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser statReporter *crunchstat.Reporter hoststatLogger io.WriteCloser @@ -412,11 +414,14 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { arvMountCmd := []string{ "arv-mount", "--foreground", - "--allow-other", "--read-write", "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","), fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} + if runner.executor.Runtime() == "docker" { + arvMountCmd = append(arvMountCmd, "--allow-other") + } + if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } @@ -612,10 +617,15 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } if pdhOnly { - arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id") + // If we are only mounting collections by pdh, make + // sure we don't subscribe to websocket events to + // avoid putting undesired load on the API server + arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening") } else { arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") } + // the by_uuid mount point is used by singularity when writing + // out docker images converted to SIF arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid") arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) @@ -976,20 +986,33 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st runner.executorStdin = stdin runner.executorStdout = stdout runner.executorStderr = stderr + + cudaDeviceCount := 0 + if runner.Container.RuntimeConstraints.CUDADriverVersion != "" || + runner.Container.RuntimeConstraints.CUDAHardwareCapability != "" || + runner.Container.RuntimeConstraints.CUDADeviceCount != 0 { + // if any of these are set, enable CUDA GPU support + cudaDeviceCount = runner.Container.RuntimeConstraints.CUDADeviceCount + if cudaDeviceCount == 0 { + cudaDeviceCount = 1 + } + } + return runner.executor.Create(containerSpec{ - Image: imageID, - VCPUs: runner.Container.RuntimeConstraints.VCPUs, - RAM: ram, - WorkingDir: workdir, - Env: env, - BindMounts: bindmounts, - Command: runner.Container.Command, - EnableNetwork: enableNetwork, - NetworkMode: runner.networkMode, - CgroupParent: runner.setCgroupParent, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, + Image: imageID, + VCPUs: runner.Container.RuntimeConstraints.VCPUs, + RAM: ram, + WorkingDir: workdir, + Env: env, + BindMounts: bindmounts, + Command: runner.Container.Command, + EnableNetwork: enableNetwork, + CUDADeviceCount: cudaDeviceCount, + NetworkMode: runner.networkMode, + CgroupParent: runner.setCgroupParent, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, }) } @@ -1277,6 +1300,16 @@ func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) }() + if runner.keepstoreLogger != nil { + // Flush any buffered logs from our local keepstore + // process. Discard anything logged after this point + // -- it won't end up in the log collection, so + // there's no point writing it to the collectionfs. + runner.keepstoreLogbuf.SetWriter(io.Discard) + runner.keepstoreLogger.Close() + runner.keepstoreLogger = nil + } + if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, // we must be closing the re-opened log, which won't @@ -1285,6 +1318,7 @@ func (runner *ContainerRunner) CommitLogs() error { // -- it exists only to send logs to other channels. return nil } + saved, err := runner.saveLogCollection(true) if err != nil { return fmt.Errorf("error saving log collection: %s", err) @@ -1647,6 +1681,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client, } func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") @@ -1677,11 +1712,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ignoreDetachFlag = true } - if err := flags.Parse(args); err == flag.ErrHelp { - return 0 - } else if err != nil { - log.Print(err) - return 1 + if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok { + return code + } else if !*list && flags.NArg() != 1 { + fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") + return 2 } containerUUID := flags.Arg(0) @@ -1702,9 +1737,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s var conf ConfigData if *stdinConfig { - err := json.NewDecoder(os.Stdin).Decode(&conf) + err := json.NewDecoder(stdin).Decode(&conf) if err != nil { - log.Print(err) + log.Printf("decode stdin: %s", err) return 1 } for k, v := range conf.Env { @@ -1730,8 +1765,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } - var keepstoreLog bufThenWrite - keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr)) + var keepstoreLogbuf bufThenWrite + keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) if err != nil { log.Print(err) return 1 @@ -1747,9 +1782,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } api.Retries = 8 - kc, kcerr := keepclient.MakeKeepClient(api) - if kcerr != nil { - log.Printf("%s: %v", containerUUID, kcerr) + kc, err := keepclient.MakeKeepClient(api) + if err != nil { + log.Printf("%s: %v", containerUUID, err) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} @@ -1761,17 +1796,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } - if keepstore != nil { - w, err := cr.NewLogWriter("keepstore") + if keepstore == nil { + // Log explanation (if any) for why we're not running + // a local keepstore. + var buf bytes.Buffer + keepstoreLogbuf.SetWriter(&buf) + if buf.Len() > 0 { + cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String())) + } + } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" { + cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) + keepstoreLogbuf.SetWriter(io.Discard) + } else { + cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) + logwriter, err := cr.NewLogWriter("keepstore") if err != nil { log.Print(err) return 1 } - err = keepstoreLog.SetWriter(NewThrottledLogger(w)) + cr.keepstoreLogger = NewThrottledLogger(logwriter) + + var writer io.WriteCloser = cr.keepstoreLogger + if logWhat == "errors" { + writer = &filterKeepstoreErrorsOnly{WriteCloser: writer} + } else if logWhat != "all" { + // should have been caught earlier by + // dispatcher's config loader + log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat) + return 1 + } + err = keepstoreLogbuf.SetWriter(writer) if err != nil { log.Print(err) return 1 } + cr.keepstoreLogbuf = &keepstoreLogbuf } switch *runtimeEngine { @@ -1865,6 +1924,16 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er if configData.Cluster == nil || configData.KeepBuffers < 1 { return nil, nil } + for uuid, vol := range configData.Cluster.Volumes { + if len(vol.AccessViaHosts) > 0 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid) + return nil, nil + } + if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication { + fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication) + return nil, nil + } + } // Rather than have an alternate way to tell keepstore how // many buffers to use when starting it this way, we just @@ -1895,13 +1964,20 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er return nil, err } cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-") + if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") { + // If we're a 'go test' process, running + // /proc/self/exe would start the test suite in a + // child process, which is not what we want. + cmd.Path, _ = exec.LookPath("go") + cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...) + cmd.Env = os.Environ() + } cmd.Stdin = &confJSON cmd.Stdout = logbuf cmd.Stderr = logbuf - cmd.Env = []string{ + cmd.Env = append(cmd.Env, "GOGC=10", - "ARVADOS_SERVICE_INTERNAL_URL=" + url, - } + "ARVADOS_SERVICE_INTERNAL_URL="+url) err = cmd.Start() if err != nil { return nil, fmt.Errorf("error starting keepstore process: %w", err)