X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7a0626a6ca34771ffd45d2b669a39690acf9a8b0..96d8b9e1afecccae803ec4b956ada745dbe71d9f:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index ea48512181..8e7d3b0d6e 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -321,7 +321,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e "Unhandled exception during FUSE operation", }, ReportFunc: func(pattern, text string) { - runner.updateRuntimeStatus("arv-mount: "+pattern, text) + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }) }, } c.Stdout = runner.arvMountLog @@ -635,7 +638,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("creating temp dir: %v", err) } - err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) + err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token) if err != nil { return nil, err } @@ -731,6 +734,7 @@ func (runner *ContainerRunner) stopHoststat() error { return nil } runner.hoststatReporter.Stop() + runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog) err := runner.hoststatLogger.Close() if err != nil { return fmt.Errorf("error closing hoststat logs: %v", err) @@ -761,12 +765,16 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.executor.CgroupID(), - Logger: log.New(runner.statLogger, "", 0), CgroupParent: runner.expectCgroupParent, CgroupRoot: runner.cgroupRoot, - PollPeriod: runner.statInterval, - TempDir: runner.parentTemp, + CID: runner.executor.CgroupID(), + Logger: log.New(runner.statLogger, "", 0), + MemThresholds: map[string][]crunchstat.Threshold{ + "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), + }, + PollPeriod: runner.statInterval, + TempDir: runner.parentTemp, + ThresholdLogger: runner.CrunchLog, } runner.statReporter.Start() return nil @@ -1116,6 +1124,7 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{"exit_code": exitcode}, }, nil) if err != nil { @@ -1145,6 +1154,9 @@ func (runner *ContainerRunner) WaitFinish() error { if runner.statReporter != nil { runner.statReporter.Stop() + runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{ + "rss": runner.Container.RuntimeConstraints.RAM, + }) err = runner.statLogger.Close() if err != nil { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) @@ -1189,7 +1201,10 @@ func (runner *ContainerRunner) updateLogs() { } err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ - "container": arvadosclient.Dict{"log": saved.PortableDataHash}, + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "log": saved.PortableDataHash, + }, }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) @@ -1203,6 +1218,8 @@ func (runner *ContainerRunner) updateLogs() { var spotInterruptionCheckInterval = 5 * time.Second var ec2MetadataBaseURL = "http://169.254.169.254" +const ec2TokenTTL = time.Second * 21600 + func (runner *ContainerRunner) checkSpotInterruptionNotices() { type ec2metadata struct { Action string `json:"action"` @@ -1210,38 +1227,47 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { } runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL) var metadata ec2metadata + var token string + var tokenExp time.Time check := func() error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil) - if err != nil { - return err - } - req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%s", resp.Status) - } - token, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err + if token == "" || tokenExp.Sub(time.Now()) < time.Minute { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second))) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s", resp.Status) + } + newtoken, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + token = strings.TrimSpace(string(newtoken)) + tokenExp = time.Now().Add(ec2TokenTTL) } - req, err = http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil) if err != nil { return err } - req.Header.Set("X-aws-ec2-metadata-token", strings.TrimSpace(string(token))) - resp, err = http.DefaultClient.Do(req) + req.Header.Set("X-aws-ec2-metadata-token", token) + resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() metadata = ec2metadata{} - if resp.StatusCode == http.StatusNotFound { + switch resp.StatusCode { + case http.StatusOK: + break + case http.StatusNotFound: // "If Amazon EC2 is not preparing to stop or // terminate the instance, or if you // terminated the instance yourself, @@ -1249,7 +1275,10 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { // instance metadata and you receive an HTTP // 404 error when you try to retrieve it." return nil - } else if resp.StatusCode != http.StatusOK { + case http.StatusUnauthorized: + token = "" + return fmt.Errorf("%s", resp.Status) + default: return fmt.Errorf("%s", resp.Status) } err = json.NewDecoder(resp.Body).Decode(&metadata) @@ -1265,17 +1294,22 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { if err != nil { runner.CrunchLog.Printf("Error checking spot interruptions: %s", err) failures++ - if failures > 3 { - runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many errors") + if failures > 5 { + runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures") return } continue } + failures = 0 if metadata != lastmetadata { lastmetadata = metadata - text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) + text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) runner.CrunchLog.Printf("%s", text) - runner.updateRuntimeStatus("instance interruption", text) + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "preemption notice", + "warningDetail": text, + "preemptionNotice": text, + }) if proc, err := os.FindProcess(os.Getpid()); err == nil { // trigger updateLogs proc.Signal(syscall.SIGUSR1) @@ -1284,13 +1318,11 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() { } } -func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) { +func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) { err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, "container": arvadosclient.Dict{ - "runtime_status": arvadosclient.Dict{ - "warning": warning, - "warningDetail": detail, - }, + "runtime_status": status, }, }, nil) if err != nil { @@ -1305,7 +1337,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, - nil, &runner.Container) + arvadosclient.Dict{ + "select": []string{"output"}, + }, &runner.Container) if err != nil { return err } @@ -1318,7 +1352,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er txt, err := (&copier{ client: runner.containerClient, - arvClient: runner.ContainerArvClient, keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, @@ -1344,6 +1377,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er var resp arvados.Collection err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{ "ensure_unique_name": true, + "select": []string{"portable_data_hash"}, "collection": arvadosclient.Dict{ "is_trashed": true, "name": "output for " + runner.Container.UUID, @@ -1470,6 +1504,8 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } +// Create/update the log collection. Return value has UUID and +// PortableDataHash fields populated, but others may be blank. func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) { runner.logMtx.Lock() defer runner.logMtx.Unlock() @@ -1494,11 +1530,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C if final { updates["is_trashed"] = true } else { - exp := time.Now().Add(crunchLogUpdatePeriod * 24) + // We set trash_at so this collection gets + // automatically cleaned up eventually. It used to be + // 12 hours but we had a situation where the API + // server was down over a weekend but the containers + // kept running such that the log collection got + // trashed, so now we make it 2 weeks. refs #20378 + exp := time.Now().Add(time.Duration(24*14) * time.Hour) updates["trash_at"] = exp updates["delete_at"] = exp } - reqBody := arvadosclient.Dict{"collection": updates} + reqBody := arvadosclient.Dict{ + "select": []string{"uuid", "portable_data_hash"}, + "collection": updates, + } var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true @@ -1533,7 +1578,10 @@ func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { return runner.DispatcherArvClient.Update( "containers", runner.Container.UUID, - arvadosclient.Dict{"container": updates}, + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": updates, + }, nil, ) } @@ -1571,7 +1619,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { update["output"] = *runner.OutputPDH } update["cost"] = runner.calculateCost(time.Now()) - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) + return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": update, + }, nil) } // IsCancelled returns the value of Cancelled, with goroutine safety. @@ -1616,11 +1667,7 @@ func (runner *ContainerRunner) Run() (err error) { signal.Notify(sigusr2, syscall.SIGUSR2) defer signal.Stop(sigusr2) runner.loadPrices() - go func() { - for range sigusr2 { - runner.loadPrices() - } - }() + go runner.handleSIGUSR2(sigusr2) runner.finalState = "Queued" @@ -1966,7 +2013,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log.Printf("%s: %v", containerUUID, err) return 1 } - api.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + api.Retries = 10 kc, err := keepclient.MakeKeepClient(api) if err != nil { @@ -2054,6 +2103,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ContainerUUID: containerUUID, Target: cr.executor, Log: cr.CrunchLog, + LogCollection: cr.LogCollection, } if gwListen == "" { // Direct connection won't work, so we use the @@ -2064,7 +2114,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.gateway.UpdateTunnelURL = func(url string) { cr.gateway.Address = "tunnel " + url cr.DispatcherArvClient.Update("containers", containerUUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}, + }, nil) } } err = cr.gateway.Start() @@ -2142,7 +2195,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) return conf } - arv.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + arv.Retries = 10 var ctr arvados.Container err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) if err != nil { @@ -2426,3 +2481,16 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 { return cost } + +func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) { + for range sigchan { + runner.loadPrices() + update := arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "cost": runner.calculateCost(time.Now()), + }, + } + runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil) + } +}