X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ae6fe3864ca6b254dfa3345985568c1cc94358fe..HEAD:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 57eed84bac..7782a9c5a6 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -12,6 +12,7 @@ import ( "flag" "fmt" "io" + "io/fs" "io/ioutil" "log" "net" @@ -45,6 +46,8 @@ import ( type command struct{} +var arvadosCertPath = "/etc/arvados/ca-certificates.crt" + var Command = command{} // ConfigData contains environment variables and (when needed) cluster @@ -75,13 +78,9 @@ type IKeepClient interface { ReadAt(locator string, p []byte, off int) (int, error) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) LocalLocator(locator string) (string, error) - ClearBlockCache() SetStorageClasses(sc []string) } -// NewLogWriter is a factory function to create a new log writer. -type NewLogWriter func(name string) (io.WriteCloser, error) - type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -123,8 +122,7 @@ type ContainerRunner struct { Container arvados.Container token string ExitCode *int - NewLogWriter NewLogWriter - CrunchLog *ThrottledLogger + CrunchLog *logWriter logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -152,20 +150,12 @@ type ContainerRunner struct { hoststatLogger io.WriteCloser hoststatReporter *crunchstat.Reporter statInterval time.Duration - cgroupRoot string - // What we expect the container's cgroup parent to be. - expectCgroupParent string // What we tell docker to use as the container's cgroup - // parent. Note: Ideally we would use the same field for both - // expectCgroupParent and setCgroupParent, and just make it - // default to "docker". However, when using docker < 1.10 with - // systemd, specifying a non-empty cgroup parent (even the - // default value "docker") hits a docker bug - // (https://github.com/docker/docker/issues/17126). Using two - // separate fields makes it possible to use the "expect cgroup - // parent to be X" feature even on sites where the "specify - // cgroup parent" feature breaks. + // parent. setCgroupParent string + // Fake root dir where crunchstat.Reporter should read OS + // files, for testing. + crunchstatFakeFS fs.FS cStateLock sync.Mutex cCancelled bool // StopContainer() invoked @@ -174,7 +164,7 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor brokenNodeHook string // script to run if node appears to be broken - arvMountLog *ThrottledLogger + arvMountLog io.WriteCloser containerWatchdogInterval time.Duration @@ -309,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - w, err := runner.NewLogWriter("arv-mount") + runner.arvMountLog, err = runner.openLogFile("arv-mount") if err != nil { return nil, err } - runner.arvMountLog = NewThrottledLogger(w) scanner := logScanner{ Patterns: []string{ "Keep write error", @@ -501,7 +490,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } } - if bind == "/etc/arvados/ca-certificates.crt" { + if bind == arvadosCertPath { needCertMount = false } @@ -632,17 +621,6 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { // OutputPath is a staging directory. bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } - - case mnt.Kind == "git_tree": - tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree") - if err != nil { - return nil, fmt.Errorf("creating temp dir: %v", err) - } - err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) - if err != nil { - return nil, err - } - bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true} } } @@ -651,10 +629,19 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } if needCertMount && runner.Container.RuntimeConstraints.API { - for _, certfile := range arvadosclient.CertFiles { - _, err := os.Stat(certfile) - if err == nil { - bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true} + for _, certfile := range []string{ + // Populated by caller, or sdk/go/arvados init(), or test suite: + os.Getenv("SSL_CERT_FILE"), + // Copied from Go 1.21 stdlib (src/crypto/x509/root_linux.go): + "/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc. + "/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6 + "/etc/ssl/ca-bundle.pem", // OpenSUSE + "/etc/pki/tls/cacert.pem", // OpenELEC + "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7 + "/etc/ssl/cert.pem", // Alpine Linux + } { + if _, err := os.Stat(certfile); err == nil { + bindmounts[arvadosCertPath] = bindmount{HostPath: certfile, ReadOnly: true} break } } @@ -734,6 +721,7 @@ func (runner *ContainerRunner) stopHoststat() error { return nil } runner.hoststatReporter.Stop() + runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog) err := runner.hoststatLogger.Close() if err != nil { return fmt.Errorf("error closing hoststat logs: %v", err) @@ -742,14 +730,22 @@ func (runner *ContainerRunner) stopHoststat() error { } func (runner *ContainerRunner) startHoststat() error { - w, err := runner.NewLogWriter("hoststat") + var err error + runner.hoststatLogger, err = runner.openLogFile("hoststat") if err != nil { return err } - runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), - CgroupRoot: runner.cgroupRoot, + Logger: newLogWriter(runner.hoststatLogger), + // Our own cgroup is the "host" cgroup, in the sense + // that it accounts for resource usage outside the + // container. It doesn't count _all_ resource usage on + // the system. + // + // TODO?: Use the furthest ancestor of our own cgroup + // that has stats available. (Currently crunchstat + // does not have that capability.) + Pid: os.Getpid, PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() @@ -758,18 +754,21 @@ func (runner *ContainerRunner) startHoststat() error { } func (runner *ContainerRunner) startCrunchstat() error { - w, err := runner.NewLogWriter("crunchstat") + var err error + runner.statLogger, err = runner.openLogFile("crunchstat") if err != nil { return err } - runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.executor.CgroupID(), - Logger: log.New(runner.statLogger, "", 0), - CgroupParent: runner.expectCgroupParent, - CgroupRoot: runner.cgroupRoot, - PollPeriod: runner.statInterval, - TempDir: runner.parentTemp, + Pid: runner.executor.Pid, + FS: runner.crunchstatFakeFS, + Logger: newLogWriter(runner.statLogger), + MemThresholds: map[string][]crunchstat.Threshold{ + "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), + }, + PollPeriod: runner.statInterval, + TempDir: runner.parentTemp, + ThresholdLogger: runner.CrunchLog, } runner.statReporter.Start() return nil @@ -786,7 +785,7 @@ type infoCommand struct { // might differ from what's described in the node record (see // LogNodeRecord). func (runner *ContainerRunner) LogHostInfo() (err error) { - w, err := runner.NewLogWriter("node-info") + w, err := runner.openLogFile("node-info") if err != nil { return } @@ -837,62 +836,40 @@ func (runner *ContainerRunner) LogHostInfo() (err error) { // LogContainerRecord gets and saves the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() error { - logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil) + logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}) if !logged && err == nil { err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID) } return err } -// LogNodeRecord logs the current host's InstanceType config entry (or -// the arvados#node record, if running via crunch-dispatch-slurm). +// LogNodeRecord logs the current host's InstanceType config entry, if +// running via arvados-dispatch-cloud. func (runner *ContainerRunner) LogNodeRecord() error { - if it := os.Getenv("InstanceType"); it != "" { - // Dispatched via arvados-dispatch-cloud. Save - // InstanceType config fragment received from - // dispatcher on stdin. - w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return err - } - defer w.Close() - _, err = io.WriteString(w, it) - if err != nil { - return err - } - return w.Close() + it := os.Getenv("InstanceType") + if it == "" { + // Not dispatched by arvados-dispatch-cloud. + return nil } - // Dispatched via crunch-dispatch-slurm. Look up - // apiserver's node record corresponding to - // $SLURMD_NODENAME. - hostname := os.Getenv("SLURMD_NODENAME") - if hostname == "" { - hostname, _ = os.Hostname() + // Save InstanceType config fragment received from dispatcher + // on stdin. + w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err } - _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) { - // The "info" field has admin-only info when - // obtained with a privileged token, and - // should not be logged. - node, ok := resp.(map[string]interface{}) - if ok { - delete(node, "info") - } - }) - return err + defer w.Close() + _, err = io.WriteString(w, it) + if err != nil { + return err + } + return w.Close() } -func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) { +func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}) (logged bool, err error) { writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return false, err } - w := &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: label, - writeCloser: writer, - } - reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params)) if err != nil { return false, fmt.Errorf("error getting %s record: %v", label, err) @@ -911,16 +888,13 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str } else if len(items) < 1 { return false, nil } - if munge != nil { - munge(items[0]) - } // Re-encode it using indentation to improve readability - enc := json.NewEncoder(w) + enc := json.NewEncoder(writer) enc.SetIndent("", " ") if err = enc.Encode(items[0]); err != nil { return false, fmt.Errorf("error logging %s record: %v", label, err) } - err = w.Close() + err = writer.Close() if err != nil { return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err) } @@ -954,7 +928,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { - var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) + var stdin io.Reader if mnt, ok := runner.Container.Mounts["stdin"]; ok { switch mnt.Kind { case "collection": @@ -970,28 +944,35 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stdin = f + runner.executorStdin = f case "json": j, err := json.Marshal(mnt.Content) if err != nil { return fmt.Errorf("error encoding stdin json data: %v", err) } - stdin = ioutil.NopCloser(bytes.NewReader(j)) + stdin = bytes.NewReader(j) + runner.executorStdin = io.NopCloser(nil) default: return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind) } + } else { + stdin = bytes.NewReader(nil) + runner.executorStdin = ioutil.NopCloser(nil) } - var stdout, stderr io.WriteCloser + var stdout, stderr io.Writer if mnt, ok := runner.Container.Mounts["stdout"]; ok { f, err := runner.getStdoutFile(mnt.Path) if err != nil { return err } stdout = f - } else if w, err := runner.NewLogWriter("stdout"); err != nil { + runner.executorStdout = f + } else if w, err := runner.openLogFile("stdout"); err != nil { return err } else { - stdout = NewThrottledLogger(w) + stdout = newTimestamper(w) + runner.executorStdout = w } if mnt, ok := runner.Container.Mounts["stderr"]; ok { @@ -1000,10 +981,12 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stderr = f - } else if w, err := runner.NewLogWriter("stderr"); err != nil { + runner.executorStderr = f + } else if w, err := runner.openLogFile("stderr"); err != nil { return err } else { - stderr = NewThrottledLogger(w) + stderr = newTimestamper(w) + runner.executorStderr = w } env := runner.Container.Environment @@ -1032,9 +1015,6 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st if !runner.enableMemoryLimit { ram = 0 } - runner.executorStdin = stdin - runner.executorStdout = stdout - runner.executorStderr = stderr if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 { nvidiaModprobe(runner.CrunchLog) @@ -1119,6 +1099,7 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{"exit_code": exitcode}, }, nil) if err != nil { @@ -1148,6 +1129,9 @@ func (runner *ContainerRunner) WaitFinish() error { if runner.statReporter != nil { runner.statReporter.Stop() + runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{ + "rss": runner.Container.RuntimeConstraints.RAM, + }) err = runner.statLogger.Close() if err != nil { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) @@ -1192,7 +1176,10 @@ func (runner *ContainerRunner) updateLogs() { } err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ - "container": arvadosclient.Dict{"log": saved.PortableDataHash}, + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "log": saved.PortableDataHash, + }, }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) @@ -1291,7 +1278,7 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { failures = 0 if metadata != lastmetadata { lastmetadata = metadata - text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) + text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) runner.CrunchLog.Printf("%s", text) runner.updateRuntimeStatus(arvadosclient.Dict{ "warning": "preemption notice", @@ -1308,6 +1295,7 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) { err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{ "runtime_status": status, }, @@ -1324,7 +1312,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, - nil, &runner.Container) + arvadosclient.Dict{ + "select": []string{"output"}, + }, &runner.Container) if err != nil { return err } @@ -1337,10 +1327,10 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er txt, err := (&copier{ client: runner.containerClient, - arvClient: runner.ContainerArvClient, keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, + globs: runner.Container.OutputGlob, bindmounts: bindmounts, mounts: runner.Container.Mounts, secretMounts: runner.SecretMounts, @@ -1363,6 +1353,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er var resp arvados.Collection err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{ "ensure_unique_name": true, + "select": []string{"portable_data_hash"}, "collection": arvadosclient.Dict{ "is_trashed": true, "name": "output for " + runner.Container.UUID, @@ -1445,19 +1436,11 @@ func (runner *ContainerRunner) CommitLogs() error { if runner.arvMountLog != nil { runner.arvMountLog.Close() } - runner.CrunchLog.Close() - - // Closing CrunchLog above allows them to be committed to Keep at this - // point, but re-open crunch log with ArvClient in case there are any - // other further errors (such as failing to write the log to Keep!) - // while shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: "crunch-run", - writeCloser: nil, - }) - runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) + + // From now on just log to stderr, in case there are + // any other further errors (such as failing to write + // the log to Keep!) while shutting down + runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" "))) }() if runner.keepstoreLogger != nil { @@ -1489,6 +1472,8 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } +// Create/update the log collection. Return value has UUID and +// PortableDataHash fields populated, but others may be blank. func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) { runner.logMtx.Lock() defer runner.logMtx.Unlock() @@ -1513,11 +1498,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C if final { updates["is_trashed"] = true } else { - exp := time.Now().Add(crunchLogUpdatePeriod * 24) + // We set trash_at so this collection gets + // automatically cleaned up eventually. It used to be + // 12 hours but we had a situation where the API + // server was down over a weekend but the containers + // kept running such that the log collection got + // trashed, so now we make it 2 weeks. refs #20378 + exp := time.Now().Add(time.Duration(24*14) * time.Hour) updates["trash_at"] = exp updates["delete_at"] = exp } - reqBody := arvadosclient.Dict{"collection": updates} + reqBody := arvadosclient.Dict{ + "select": []string{"uuid", "portable_data_hash"}, + "collection": updates, + } var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true @@ -1552,7 +1546,10 @@ func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { return runner.DispatcherArvClient.Update( "containers", runner.Container.UUID, - arvadosclient.Dict{"container": updates}, + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": updates, + }, nil, ) } @@ -1590,7 +1587,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { update["output"] = *runner.OutputPDH } update["cost"] = runner.calculateCost(time.Now()) - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) + return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": update, + }, nil) } // IsCancelled returns the value of Cancelled, with goroutine safety. @@ -1600,18 +1600,8 @@ func (runner *ContainerRunner) IsCancelled() bool { return runner.cCancelled } -// NewArvLogWriter creates an ArvLogWriter -func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) { - writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, err - } - return &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: name, - writeCloser: writer, - }, nil +func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) { + return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) } // Run the full container lifecycle. @@ -1635,19 +1625,13 @@ func (runner *ContainerRunner) Run() (err error) { signal.Notify(sigusr2, syscall.SIGUSR2) defer signal.Stop(sigusr2) runner.loadPrices() - go func() { - for range sigusr2 { - runner.loadPrices() - } - }() + go runner.handleSIGUSR2(sigusr2) runner.finalState = "Queued" defer func() { runner.CleanupDirs() - runner.CrunchLog.Printf("crunch-run finished") - runner.CrunchLog.Close() }() err = runner.fetchContainerRecord() @@ -1841,7 +1825,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, } - cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { @@ -1864,14 +1847,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client, return nil, err } cr.Container.UUID = containerUUID - w, err := cr.NewLogWriter("crunch-run") + f, err := cr.openLogFile("crunch-run") if err != nil { return nil, err } - cr.CrunchLog = NewThrottledLogger(w) - cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" ")))) - loadLogThrottleParams(dispatcherArvClient) go cr.updateLogs() return cr, nil @@ -1881,9 +1862,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") - cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") - cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") - cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") + flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)") + flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)") + cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup); if empty, use the docker daemon's default cgroup parent. See https://doc.arvados.org/install/crunch2-slurm/install-dispatch.html#CrunchRunCommand-cgroups") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") @@ -1968,7 +1949,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s time.Sleep(*sleep) if *caCertsPath != "" { - arvadosclient.CertFiles = []string{*caCertsPath} + os.Setenv("SSL_CERT_FILE", *caCertsPath) } keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) @@ -1985,14 +1966,15 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log.Printf("%s: %v", containerUUID, err) return 1 } - api.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + api.Retries = 10 kc, err := keepclient.MakeKeepClient(api) if err != nil { log.Printf("%s: %v", containerUUID, err) return 1 } - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID) @@ -2015,12 +1997,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s keepstoreLogbuf.SetWriter(io.Discard) } else { cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) - logwriter, err := cr.NewLogWriter("keepstore") + cr.keepstoreLogger, err = cr.openLogFile("keepstore") if err != nil { log.Print(err) return 1 } - cr.keepstoreLogger = NewThrottledLogger(logwriter) var writer io.WriteCloser = cr.keepstoreLogger if logWhat == "errors" { @@ -2046,13 +2027,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf) default: cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine) - cr.CrunchLog.Close() return 1 } if err != nil { cr.CrunchLog.Printf("%s: %v", containerUUID, err) cr.checkBrokenNode(err) - cr.CrunchLog.Close() return 1 } defer cr.executor.Close() @@ -2073,6 +2052,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ContainerUUID: containerUUID, Target: cr.executor, Log: cr.CrunchLog, + LogCollection: cr.LogCollection, } if gwListen == "" { // Direct connection won't work, so we use the @@ -2083,7 +2063,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.gateway.UpdateTunnelURL = func(url string) { cr.gateway.Address = "tunnel " + url cr.DispatcherArvClient.Update("containers", containerUUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}, + }, nil) } } err = cr.gateway.Start() @@ -2101,19 +2084,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.parentTemp = parentTemp cr.statInterval = *statInterval - cr.cgroupRoot = *cgroupRoot - cr.expectCgroupParent = *cgroupParent cr.enableMemoryLimit = *enableMemoryLimit cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { - p, err := findCgroup(*cgroupParentSubsystem) + p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem) if err != nil { log.Printf("fatal: cgroup parent subsystem: %s", err) return 1 } cr.setCgroupParent = p - cr.expectCgroupParent = p } if conf.EC2SpotCheck { @@ -2161,7 +2141,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) return conf } - arv.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + arv.Retries = 10 var ctr arvados.Container err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) if err != nil { @@ -2214,9 +2196,14 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er } // Rather than have an alternate way to tell keepstore how - // many buffers to use when starting it this way, we just - // modify the cluster configuration that we feed it on stdin. - configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers + // many buffers to use, etc., when starting it this way, we + // just modify the cluster configuration that we feed it on + // stdin. + ccfg := *configData.Cluster + ccfg.API.MaxKeepBlobBuffers = configData.KeepBuffers + ccfg.Collections.BlobTrash = false + ccfg.Collections.BlobTrashConcurrency = 0 + ccfg.Collections.BlobDeleteConcurrency = 0 localaddr := localKeepstoreAddr() ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0")) @@ -2236,7 +2223,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er var confJSON bytes.Buffer err = json.NewEncoder(&confJSON).Encode(arvados.Config{ Clusters: map[string]arvados.Cluster{ - configData.Cluster.ClusterID: *configData.Cluster, + ccfg.ClusterID: ccfg, }, }) if err != nil { @@ -2445,3 +2432,16 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 { return cost } + +func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) { + for range sigchan { + runner.loadPrices() + update := arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "cost": runner.calculateCost(time.Now()), + }, + } + runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil) + } +}