X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c9b8b9b9c78a77dd30b828914c8bee9fa8dcbb90..766d2d7ca8dbb5522a8b7de6409c83fbba4a36ca:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index eadf22876f..bde13424dd 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -12,6 +12,7 @@ import ( "flag" "fmt" "io" + "io/fs" "io/ioutil" "log" "net" @@ -31,6 +32,7 @@ import ( "syscall" "time" + "git.arvados.org/arvados.git/lib/cloud" "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/crunchstat" @@ -44,14 +46,17 @@ import ( type command struct{} +var arvadosCertPath = "/etc/arvados/ca-certificates.crt" + var Command = command{} // ConfigData contains environment variables and (when needed) cluster // configuration, passed from dispatchcloud to crunch-run on stdin. type ConfigData struct { - Env map[string]string - KeepBuffers int - Cluster *arvados.Cluster + Env map[string]string + KeepBuffers int + EC2SpotCheck bool + Cluster *arvados.Cluster } // IArvadosClient is the minimal Arvados API methods used by crunch-run. @@ -73,7 +78,6 @@ type IKeepClient interface { ReadAt(locator string, p []byte, off int) (int, error) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) LocalLocator(locator string) (string, error) - ClearBlockCache() SetStorageClasses(sc []string) } @@ -140,7 +144,9 @@ type ContainerRunner struct { MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) finalState string parentTemp string + costStartTime time.Time + keepstore *exec.Cmd keepstoreLogger io.WriteCloser keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser @@ -148,20 +154,12 @@ type ContainerRunner struct { hoststatLogger io.WriteCloser hoststatReporter *crunchstat.Reporter statInterval time.Duration - cgroupRoot string - // What we expect the container's cgroup parent to be. - expectCgroupParent string // What we tell docker to use as the container's cgroup - // parent. Note: Ideally we would use the same field for both - // expectCgroupParent and setCgroupParent, and just make it - // default to "docker". However, when using docker < 1.10 with - // systemd, specifying a non-empty cgroup parent (even the - // default value "docker") hits a docker bug - // (https://github.com/docker/docker/issues/17126). Using two - // separate fields makes it possible to use the "expect cgroup - // parent to be X" feature even on sites where the "specify - // cgroup parent" feature breaks. + // parent. setCgroupParent string + // Fake root dir where crunchstat.Reporter should read OS + // files, for testing. + crunchstatFakeFS fs.FS cStateLock sync.Mutex cCancelled bool // StopContainer() invoked @@ -175,6 +173,9 @@ type ContainerRunner struct { containerWatchdogInterval time.Duration gateway Gateway + + prices []cloud.InstancePrice + pricesLock sync.Mutex } // setupSignals sets up signal handling to gracefully terminate the @@ -313,7 +314,12 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e "Block not found error", "Unhandled exception during FUSE operation", }, - ReportFunc: runner.reportArvMountWarning, + ReportFunc: func(pattern, text string) { + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }) + }, } c.Stdout = runner.arvMountLog c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner) @@ -426,8 +432,14 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { arvMountCmd = append(arvMountCmd, "--allow-other") } - if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { - arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) + if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 { + keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache") + if err != nil { + return nil, fmt.Errorf("while creating keep cache temp dir: %v", err) + } + arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk)) + } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { + arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } collectionPaths := []string{} @@ -483,7 +495,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } } - if bind == "/etc/arvados/ca-certificates.crt" { + if bind == arvadosCertPath { needCertMount = false } @@ -620,7 +632,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("creating temp dir: %v", err) } - err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) + err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token) if err != nil { return nil, err } @@ -633,10 +645,19 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } if needCertMount && runner.Container.RuntimeConstraints.API { - for _, certfile := range arvadosclient.CertFiles { - _, err := os.Stat(certfile) - if err == nil { - bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true} + for _, certfile := range []string{ + // Populated by caller, or sdk/go/arvados init(), or test suite: + os.Getenv("SSL_CERT_FILE"), + // Copied from Go 1.21 stdlib (src/crypto/x509/root_linux.go): + "/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc. + "/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6 + "/etc/ssl/ca-bundle.pem", // OpenSUSE + "/etc/pki/tls/cacert.pem", // OpenELEC + "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7 + "/etc/ssl/cert.pem", // Alpine Linux + } { + if _, err := os.Stat(certfile); err == nil { + bindmounts[arvadosCertPath] = bindmount{HostPath: certfile, ReadOnly: true} break } } @@ -659,6 +680,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -713,6 +737,7 @@ func (runner *ContainerRunner) stopHoststat() error { return nil } runner.hoststatReporter.Stop() + runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog) err := runner.hoststatLogger.Close() if err != nil { return fmt.Errorf("error closing hoststat logs: %v", err) @@ -727,11 +752,20 @@ func (runner *ContainerRunner) startHoststat() error { } runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), - CgroupRoot: runner.cgroupRoot, + Logger: log.New(runner.hoststatLogger, "", 0), + // Our own cgroup is the "host" cgroup, in the sense + // that it accounts for resource usage outside the + // container. It doesn't count _all_ resource usage on + // the system. + // + // TODO?: Use the furthest ancestor of our own cgroup + // that has stats available. (Currently crunchstat + // does not have that capability.) + Pid: os.Getpid, PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -742,12 +776,15 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.executor.CgroupID(), - Logger: log.New(runner.statLogger, "", 0), - CgroupParent: runner.expectCgroupParent, - CgroupRoot: runner.cgroupRoot, - PollPeriod: runner.statInterval, - TempDir: runner.parentTemp, + Pid: runner.executor.Pid, + FS: runner.crunchstatFakeFS, + Logger: log.New(runner.statLogger, "", 0), + MemThresholds: map[string][]crunchstat.Threshold{ + "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), + }, + PollPeriod: runner.statInterval, + TempDir: runner.parentTemp, + ThresholdLogger: runner.CrunchLog, } runner.statReporter.Start() return nil @@ -1097,6 +1134,7 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{"exit_code": exitcode}, }, nil) if err != nil { @@ -1126,6 +1164,9 @@ func (runner *ContainerRunner) WaitFinish() error { if runner.statReporter != nil { runner.statReporter.Stop() + runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{ + "rss": runner.Container.RuntimeConstraints.RAM, + }) err = runner.statLogger.Close() if err != nil { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) @@ -1170,7 +1211,10 @@ func (runner *ContainerRunner) updateLogs() { } err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ - "container": arvadosclient.Dict{"log": saved.PortableDataHash}, + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "log": saved.PortableDataHash, + }, }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) @@ -1181,16 +1225,116 @@ func (runner *ContainerRunner) updateLogs() { } } -func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) { - var updated arvados.Container +var spotInterruptionCheckInterval = 5 * time.Second +var ec2MetadataBaseURL = "http://169.254.169.254" + +const ec2TokenTTL = time.Second * 21600 + +func (runner *ContainerRunner) checkSpotInterruptionNotices() { + type ec2metadata struct { + Action string `json:"action"` + Time time.Time `json:"time"` + } + runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL) + var metadata ec2metadata + var token string + var tokenExp time.Time + check := func() error { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + defer cancel() + if token == "" || tokenExp.Sub(time.Now()) < time.Minute { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second))) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s", resp.Status) + } + newtoken, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + token = strings.TrimSpace(string(newtoken)) + tokenExp = time.Now().Add(ec2TokenTTL) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token", token) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + metadata = ec2metadata{} + switch resp.StatusCode { + case http.StatusOK: + break + case http.StatusNotFound: + // "If Amazon EC2 is not preparing to stop or + // terminate the instance, or if you + // terminated the instance yourself, + // instance-action is not present in the + // instance metadata and you receive an HTTP + // 404 error when you try to retrieve it." + return nil + case http.StatusUnauthorized: + token = "" + return fmt.Errorf("%s", resp.Status) + default: + return fmt.Errorf("%s", resp.Status) + } + err = json.NewDecoder(resp.Body).Decode(&metadata) + if err != nil { + return err + } + return nil + } + failures := 0 + var lastmetadata ec2metadata + for range time.NewTicker(spotInterruptionCheckInterval).C { + err := check() + if err != nil { + runner.CrunchLog.Printf("Error checking spot interruptions: %s", err) + failures++ + if failures > 5 { + runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures") + return + } + continue + } + failures = 0 + if metadata != lastmetadata { + lastmetadata = metadata + text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) + runner.CrunchLog.Printf("%s", text) + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "preemption notice", + "warningDetail": text, + "preemptionNotice": text, + }) + if proc, err := os.FindProcess(os.Getpid()); err == nil { + // trigger updateLogs + proc.Signal(syscall.SIGUSR1) + } + } + } +} + +func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) { err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{ - "runtime_status": arvadosclient.Dict{ - "warning": "arv-mount: " + pattern, - "warningDetail": text, - }, + "runtime_status": status, }, - }, &updated) + }, nil) if err != nil { runner.CrunchLog.Printf("error updating container runtime_status: %s", err) } @@ -1203,7 +1347,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, - nil, &runner.Container) + arvadosclient.Dict{ + "select": []string{"output"}, + }, &runner.Container) if err != nil { return err } @@ -1216,7 +1362,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er txt, err := (&copier{ client: runner.containerClient, - arvClient: runner.ContainerArvClient, keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, @@ -1242,6 +1387,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er var resp arvados.Collection err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{ "ensure_unique_name": true, + "select": []string{"portable_data_hash"}, "collection": arvadosclient.Dict{ "is_trashed": true, "name": "output for " + runner.Container.UUID, @@ -1368,6 +1514,8 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } +// Create/update the log collection. Return value has UUID and +// PortableDataHash fields populated, but others may be blank. func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) { runner.logMtx.Lock() defer runner.logMtx.Unlock() @@ -1392,11 +1540,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C if final { updates["is_trashed"] = true } else { - exp := time.Now().Add(crunchLogUpdatePeriod * 24) + // We set trash_at so this collection gets + // automatically cleaned up eventually. It used to be + // 12 hours but we had a situation where the API + // server was down over a weekend but the containers + // kept running such that the log collection got + // trashed, so now we make it 2 weeks. refs #20378 + exp := time.Now().Add(time.Duration(24*14) * time.Hour) updates["trash_at"] = exp updates["delete_at"] = exp } - reqBody := arvadosclient.Dict{"collection": updates} + reqBody := arvadosclient.Dict{ + "select": []string{"uuid", "portable_data_hash"}, + "collection": updates, + } var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true @@ -1415,14 +1572,28 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C } // UpdateContainerRunning updates the container state to "Running" -func (runner *ContainerRunner) UpdateContainerRunning() error { +func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if runner.cCancelled { return ErrCancelled } - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) + updates := arvadosclient.Dict{ + "gateway_address": runner.gateway.Address, + "state": "Running", + } + if logId != "" { + updates["log"] = logId + } + return runner.DispatcherArvClient.Update( + "containers", + runner.Container.UUID, + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": updates, + }, + nil, + ) } // ContainerToken returns the api_token the container (and any @@ -1457,7 +1628,11 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { if runner.finalState == "Complete" && runner.OutputPDH != nil { update["output"] = *runner.OutputPDH } - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) + update["cost"] = runner.calculateCost(time.Now()) + return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": update, + }, nil) } // IsCancelled returns the value of Cancelled, with goroutine safety. @@ -1489,6 +1664,7 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Using FUSE mount: %s", v) runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime()) runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID) + runner.costStartTime = time.Now() hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1497,6 +1673,12 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } + sigusr2 := make(chan os.Signal, 1) + signal.Notify(sigusr2, syscall.SIGUSR2) + defer signal.Stop(sigusr2) + runner.loadPrices() + go runner.handleSIGUSR2(sigusr2) + runner.finalState = "Queued" defer func() { @@ -1563,6 +1745,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1605,7 +1790,14 @@ func (runner *ContainerRunner) Run() (err error) { return } - err = runner.UpdateContainerRunning() + logCollection, err := runner.saveLogCollection(false) + var logId string + if err == nil { + logId = logCollection.PortableDataHash + } else { + runner.CrunchLog.Printf("Error committing initial log collection: %v", err) + } + err = runner.UpdateContainerRunning(logId) if err != nil { return } @@ -1727,16 +1919,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") - cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") - cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") - cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") + flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)") + flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)") + cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup); if empty, use the docker daemon's default cgroup parent. See https://doc.arvados.org/install/crunch2-slurm/install-dispatch.html#CrunchRunCommand-cgroups") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") - list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") + list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)") enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage") enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)") networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) @@ -1772,11 +1964,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s switch { case *detach && !ignoreDetachFlag: - return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, stdin, stdout, stderr) case *kill >= 0: - return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr) case *list: - return ListProcesses(os.Stdout, os.Stderr) + return ListProcesses(stdin, stdout, stderr) } if len(containerUUID) != 27 { @@ -1814,7 +2006,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s time.Sleep(*sleep) if *caCertsPath != "" { - arvadosclient.CertFiles = []string{*caCertsPath} + os.Setenv("SSL_CERT_FILE", *caCertsPath) } keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) @@ -1831,14 +2023,15 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log.Printf("%s: %v", containerUUID, err) return 1 } - api.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + api.Retries = 10 kc, err := keepclient.MakeKeepClient(api) if err != nil { log.Printf("%s: %v", containerUUID, err) return 1 } - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID) @@ -1847,6 +2040,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore if keepstore == nil { // Log explanation (if any) for why we're not running // a local keepstore. @@ -1918,6 +2112,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ContainerUUID: containerUUID, Target: cr.executor, Log: cr.CrunchLog, + LogCollection: cr.LogCollection, } if gwListen == "" { // Direct connection won't work, so we use the @@ -1928,7 +2123,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.gateway.UpdateTunnelURL = func(url string) { cr.gateway.Address = "tunnel " + url cr.DispatcherArvClient.Update("containers", containerUUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}, + }, nil) } } err = cr.gateway.Start() @@ -1946,19 +2144,20 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.parentTemp = parentTemp cr.statInterval = *statInterval - cr.cgroupRoot = *cgroupRoot - cr.expectCgroupParent = *cgroupParent cr.enableMemoryLimit = *enableMemoryLimit cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { - p, err := findCgroup(*cgroupParentSubsystem) + p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem) if err != nil { log.Printf("fatal: cgroup parent subsystem: %s", err) return 1 } cr.setCgroupParent = p - cr.expectCgroupParent = p + } + + if conf.EC2SpotCheck { + go cr.checkSpotInterruptionNotices() } runerr := cr.Run() @@ -2002,7 +2201,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) return conf } - arv.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + arv.Retries = 10 var ctr arvados.Container err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) if err != nil { @@ -2202,3 +2403,100 @@ func localKeepstoreAddr() string { }) return ips[0].String() } + +func (cr *ContainerRunner) loadPrices() { + buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile)) + if err != nil { + if !os.IsNotExist(err) { + cr.CrunchLog.Printf("loadPrices: read: %s", err) + } + return + } + var prices []cloud.InstancePrice + err = json.Unmarshal(buf, &prices) + if err != nil { + cr.CrunchLog.Printf("loadPrices: decode: %s", err) + return + } + cr.pricesLock.Lock() + defer cr.pricesLock.Unlock() + var lastKnown time.Time + if len(cr.prices) > 0 { + lastKnown = cr.prices[0].StartTime + } + cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...)) + for i := len(cr.prices) - 1; i >= 0; i-- { + price := cr.prices[i] + if price.StartTime.After(lastKnown) { + cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC()) + } + } +} + +func (cr *ContainerRunner) calculateCost(now time.Time) float64 { + cr.pricesLock.Lock() + defer cr.pricesLock.Unlock() + + // First, make a "prices" slice with the real data as far back + // as it goes, and (if needed) a "since the beginning of time" + // placeholder containing a reasonable guess about what the + // price was between cr.costStartTime and the earliest real + // data point. + prices := cr.prices + if len(prices) == 0 { + // use price info in InstanceType record initially + // provided by cloud dispatcher + var p float64 + var it arvados.InstanceType + if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 { + p = it.Price + } + prices = []cloud.InstancePrice{{Price: p}} + } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) { + // guess earlier pricing was the same as the earliest + // price we know about + filler := prices[len(prices)-1] + filler.StartTime = time.Time{} + prices = append(prices, filler) + } + + // Now that our history of price changes goes back at least as + // far as cr.costStartTime, add up the costs for each + // interval. + cost := 0.0 + spanEnd := now + for _, ip := range prices { + spanStart := ip.StartTime + if spanStart.After(now) { + // pricing information from the future -- not + // expected from AWS, but possible in + // principle, and exercised by tests. + continue + } + last := false + if spanStart.Before(cr.costStartTime) { + spanStart = cr.costStartTime + last = true + } + cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600 + if last { + break + } + spanEnd = spanStart + } + + return cost +} + +func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) { + for range sigchan { + runner.loadPrices() + update := arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "cost": runner.calculateCost(time.Now()), + }, + } + runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil) + } +}