X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c138f58b21edb574b101588f6fc61dce8a98ed3e..ff22334d01e09b0074be6416f9285dae8d5af565:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 1f130a5b10..4a514f3d89 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -50,9 +50,10 @@ var Command = command{} // ConfigData contains environment variables and (when needed) cluster // configuration, passed from dispatchcloud to crunch-run on stdin. type ConfigData struct { - Env map[string]string - KeepBuffers int - Cluster *arvados.Cluster + Env map[string]string + KeepBuffers int + EC2SpotCheck bool + Cluster *arvados.Cluster } // IArvadosClient is the minimal Arvados API methods used by crunch-run. @@ -319,7 +320,12 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e "Block not found error", "Unhandled exception during FUSE operation", }, - ReportFunc: runner.reportArvMountWarning, + ReportFunc: func(pattern, text string) { + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }) + }, } c.Stdout = runner.arvMountLog c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner) @@ -728,6 +734,7 @@ func (runner *ContainerRunner) stopHoststat() error { return nil } runner.hoststatReporter.Stop() + runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog) err := runner.hoststatLogger.Close() if err != nil { return fmt.Errorf("error closing hoststat logs: %v", err) @@ -758,12 +765,16 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.executor.CgroupID(), - Logger: log.New(runner.statLogger, "", 0), CgroupParent: runner.expectCgroupParent, CgroupRoot: runner.cgroupRoot, - PollPeriod: runner.statInterval, - TempDir: runner.parentTemp, + CID: runner.executor.CgroupID(), + Logger: log.New(runner.statLogger, "", 0), + MemThresholds: map[string][]crunchstat.Threshold{ + "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), + }, + PollPeriod: runner.statInterval, + TempDir: runner.parentTemp, + ThresholdLogger: runner.CrunchLog, } runner.statReporter.Start() return nil @@ -1142,6 +1153,9 @@ func (runner *ContainerRunner) WaitFinish() error { if runner.statReporter != nil { runner.statReporter.Stop() + runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{ + "rss": runner.Container.RuntimeConstraints.RAM, + }) err = runner.statLogger.Close() if err != nil { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) @@ -1197,16 +1211,115 @@ func (runner *ContainerRunner) updateLogs() { } } -func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) { - var updated arvados.Container +var spotInterruptionCheckInterval = 5 * time.Second +var ec2MetadataBaseURL = "http://169.254.169.254" + +const ec2TokenTTL = time.Second * 21600 + +func (runner *ContainerRunner) checkSpotInterruptionNotices() { + type ec2metadata struct { + Action string `json:"action"` + Time time.Time `json:"time"` + } + runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL) + var metadata ec2metadata + var token string + var tokenExp time.Time + check := func() error { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + defer cancel() + if token == "" || tokenExp.Sub(time.Now()) < time.Minute { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second))) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s", resp.Status) + } + newtoken, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + token = strings.TrimSpace(string(newtoken)) + tokenExp = time.Now().Add(ec2TokenTTL) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token", token) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + metadata = ec2metadata{} + switch resp.StatusCode { + case http.StatusOK: + break + case http.StatusNotFound: + // "If Amazon EC2 is not preparing to stop or + // terminate the instance, or if you + // terminated the instance yourself, + // instance-action is not present in the + // instance metadata and you receive an HTTP + // 404 error when you try to retrieve it." + return nil + case http.StatusUnauthorized: + token = "" + return fmt.Errorf("%s", resp.Status) + default: + return fmt.Errorf("%s", resp.Status) + } + err = json.NewDecoder(resp.Body).Decode(&metadata) + if err != nil { + return err + } + return nil + } + failures := 0 + var lastmetadata ec2metadata + for range time.NewTicker(spotInterruptionCheckInterval).C { + err := check() + if err != nil { + runner.CrunchLog.Printf("Error checking spot interruptions: %s", err) + failures++ + if failures > 5 { + runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures") + return + } + continue + } + failures = 0 + if metadata != lastmetadata { + lastmetadata = metadata + text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) + runner.CrunchLog.Printf("%s", text) + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "preemption notice", + "warningDetail": text, + "preemptionNotice": text, + }) + if proc, err := os.FindProcess(os.Getpid()); err == nil { + // trigger updateLogs + proc.Signal(syscall.SIGUSR1) + } + } + } +} + +func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) { err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ "container": arvadosclient.Dict{ - "runtime_status": arvadosclient.Dict{ - "warning": "arv-mount: " + pattern, - "warningDetail": text, - }, + "runtime_status": status, }, - }, &updated) + }, nil) if err != nil { runner.CrunchLog.Printf("error updating container runtime_status: %s", err) } @@ -1408,7 +1521,13 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C if final { updates["is_trashed"] = true } else { - exp := time.Now().Add(crunchLogUpdatePeriod * 24) + // We set trash_at so this collection gets + // automatically cleaned up eventually. It used to be + // 12 hours but we had a situation where the API + // server was down over a weekend but the containers + // kept running such that the log collection got + // trashed, so now we make it 2 weeks. refs #20378 + exp := time.Now().Add(time.Duration(24*14) * time.Hour) updates["trash_at"] = exp updates["delete_at"] = exp } @@ -1431,14 +1550,25 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C } // UpdateContainerRunning updates the container state to "Running" -func (runner *ContainerRunner) UpdateContainerRunning() error { +func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if runner.cCancelled { return ErrCancelled } - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) + updates := arvadosclient.Dict{ + "gateway_address": runner.gateway.Address, + "state": "Running", + } + if logId != "" { + updates["log"] = logId + } + return runner.DispatcherArvClient.Update( + "containers", + runner.Container.UUID, + arvadosclient.Dict{"container": updates}, + nil, + ) } // ContainerToken returns the api_token the container (and any @@ -1519,11 +1649,7 @@ func (runner *ContainerRunner) Run() (err error) { signal.Notify(sigusr2, syscall.SIGUSR2) defer signal.Stop(sigusr2) runner.loadPrices() - go func() { - for range sigusr2 { - runner.loadPrices() - } - }() + go runner.handleSIGUSR2(sigusr2) runner.finalState = "Queued" @@ -1636,7 +1762,14 @@ func (runner *ContainerRunner) Run() (err error) { return } - err = runner.UpdateContainerRunning() + logCollection, err := runner.saveLogCollection(false) + var logId string + if err == nil { + logId = logCollection.PortableDataHash + } else { + runner.CrunchLog.Printf("Error committing initial log collection: %v", err) + } + err = runner.UpdateContainerRunning(logId) if err != nil { return } @@ -1950,6 +2083,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ContainerUUID: containerUUID, Target: cr.executor, Log: cr.CrunchLog, + LogCollection: cr.LogCollection, } if gwListen == "" { // Direct connection won't work, so we use the @@ -1993,6 +2127,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.expectCgroupParent = p } + if conf.EC2SpotCheck { + go cr.checkSpotInterruptionNotices() + } + runerr := cr.Run() if *memprofile != "" { @@ -2298,6 +2436,12 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 { spanEnd := now for _, ip := range prices { spanStart := ip.StartTime + if spanStart.After(now) { + // pricing information from the future -- not + // expected from AWS, but possible in + // principle, and exercised by tests. + continue + } last := false if spanStart.Before(cr.costStartTime) { spanStart = cr.costStartTime @@ -2309,5 +2453,18 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 { } spanEnd = spanStart } + return cost } + +func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) { + for range sigchan { + runner.loadPrices() + update := arvadosclient.Dict{ + "container": arvadosclient.Dict{ + "cost": runner.calculateCost(time.Now()), + }, + } + runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil) + } +}