X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3b8da6f91b95570dde6cbf814b270dc3763f7f7..31779a06b28e21a9409ec7c6310f0871b65d13ff:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 4a514f3d89..9add35335f 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -12,6 +12,7 @@ import ( "flag" "fmt" "io" + "io/fs" "io/ioutil" "log" "net" @@ -152,20 +153,12 @@ type ContainerRunner struct { hoststatLogger io.WriteCloser hoststatReporter *crunchstat.Reporter statInterval time.Duration - cgroupRoot string - // What we expect the container's cgroup parent to be. - expectCgroupParent string // What we tell docker to use as the container's cgroup - // parent. Note: Ideally we would use the same field for both - // expectCgroupParent and setCgroupParent, and just make it - // default to "docker". However, when using docker < 1.10 with - // systemd, specifying a non-empty cgroup parent (even the - // default value "docker") hits a docker bug - // (https://github.com/docker/docker/issues/17126). Using two - // separate fields makes it possible to use the "expect cgroup - // parent to be X" feature even on sites where the "specify - // cgroup parent" feature breaks. + // parent. setCgroupParent string + // Fake root dir where crunchstat.Reporter should read OS + // files, for testing. + crunchstatFakeFS fs.FS cStateLock sync.Mutex cCancelled bool // StopContainer() invoked @@ -638,7 +631,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("creating temp dir: %v", err) } - err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) + err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token) if err != nil { return nil, err } @@ -749,8 +742,16 @@ func (runner *ContainerRunner) startHoststat() error { } runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), - CgroupRoot: runner.cgroupRoot, + Logger: log.New(runner.hoststatLogger, "", 0), + // Our own cgroup is the "host" cgroup, in the sense + // that it accounts for resource usage outside the + // container. It doesn't count _all_ resource usage on + // the system. + // + // TODO?: Use the furthest ancestor of our own cgroup + // that has stats available. (Currently crunchstat + // does not have that capability.) + Pid: os.Getpid, PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() @@ -765,10 +766,9 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CgroupParent: runner.expectCgroupParent, - CgroupRoot: runner.cgroupRoot, - CID: runner.executor.CgroupID(), - Logger: log.New(runner.statLogger, "", 0), + Pid: runner.executor.Pid, + FS: runner.crunchstatFakeFS, + Logger: log.New(runner.statLogger, "", 0), MemThresholds: map[string][]crunchstat.Threshold{ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), }, @@ -1124,6 +1124,7 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{"exit_code": exitcode}, }, nil) if err != nil { @@ -1200,7 +1201,10 @@ func (runner *ContainerRunner) updateLogs() { } err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ - "container": arvadosclient.Dict{"log": saved.PortableDataHash}, + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "log": saved.PortableDataHash, + }, }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) @@ -1316,6 +1320,7 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) { err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{ "runtime_status": status, }, @@ -1332,7 +1337,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, - nil, &runner.Container) + arvadosclient.Dict{ + "select": []string{"output"}, + }, &runner.Container) if err != nil { return err } @@ -1345,7 +1352,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er txt, err := (&copier{ client: runner.containerClient, - arvClient: runner.ContainerArvClient, keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, @@ -1371,6 +1377,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er var resp arvados.Collection err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{ "ensure_unique_name": true, + "select": []string{"portable_data_hash"}, "collection": arvadosclient.Dict{ "is_trashed": true, "name": "output for " + runner.Container.UUID, @@ -1497,6 +1504,8 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } +// Create/update the log collection. Return value has UUID and +// PortableDataHash fields populated, but others may be blank. func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) { runner.logMtx.Lock() defer runner.logMtx.Unlock() @@ -1531,7 +1540,10 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C updates["trash_at"] = exp updates["delete_at"] = exp } - reqBody := arvadosclient.Dict{"collection": updates} + reqBody := arvadosclient.Dict{ + "select": []string{"uuid", "portable_data_hash"}, + "collection": updates, + } var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true @@ -1566,7 +1578,10 @@ func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { return runner.DispatcherArvClient.Update( "containers", runner.Container.UUID, - arvadosclient.Dict{"container": updates}, + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": updates, + }, nil, ) } @@ -1604,7 +1619,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { update["output"] = *runner.OutputPDH } update["cost"] = runner.calculateCost(time.Now()) - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) + return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": update, + }, nil) } // IsCancelled returns the value of Cancelled, with goroutine safety. @@ -1891,9 +1909,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") - cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") - cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") - cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") + flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)") + flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)") + cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup)") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") @@ -1995,7 +2013,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log.Printf("%s: %v", containerUUID, err) return 1 } - api.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + api.Retries = 10 kc, err := keepclient.MakeKeepClient(api) if err != nil { @@ -2094,7 +2114,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.gateway.UpdateTunnelURL = func(url string) { cr.gateway.Address = "tunnel " + url cr.DispatcherArvClient.Update("containers", containerUUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}, + }, nil) } } err = cr.gateway.Start() @@ -2112,19 +2135,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.parentTemp = parentTemp cr.statInterval = *statInterval - cr.cgroupRoot = *cgroupRoot - cr.expectCgroupParent = *cgroupParent cr.enableMemoryLimit = *enableMemoryLimit cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { - p, err := findCgroup(*cgroupParentSubsystem) + p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem) if err != nil { log.Printf("fatal: cgroup parent subsystem: %s", err) return 1 } cr.setCgroupParent = p - cr.expectCgroupParent = p } if conf.EC2SpotCheck { @@ -2172,7 +2192,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) return conf } - arv.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + arv.Retries = 10 var ctr arvados.Container err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) if err != nil { @@ -2461,6 +2483,7 @@ func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) { for range sigchan { runner.loadPrices() update := arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{ "cost": runner.calculateCost(time.Now()), },