Merge branch '20295-fix-collection-tree-caching-bug' refs #20295
[arvados.git] / lib / crunchrun / crunchrun.go
index a9c65cca422922dac2c5a43a658c7c874ff0ce58..3f254496ba488bb3aea553b65d50937e59652096 100644 (file)
@@ -321,7 +321,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
                        "Unhandled exception during FUSE operation",
                },
                ReportFunc: func(pattern, text string) {
-                       runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":       "arv-mount: " + pattern,
+                               "warningDetail": text,
+                       })
                },
        }
        c.Stdout = runner.arvMountLog
@@ -731,6 +734,7 @@ func (runner *ContainerRunner) stopHoststat() error {
                return nil
        }
        runner.hoststatReporter.Stop()
+       runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
        err := runner.hoststatLogger.Close()
        if err != nil {
                return fmt.Errorf("error closing hoststat logs: %v", err)
@@ -761,12 +765,16 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.executor.CgroupID(),
-               Logger:       log.New(runner.statLogger, "", 0),
                CgroupParent: runner.expectCgroupParent,
                CgroupRoot:   runner.cgroupRoot,
-               PollPeriod:   runner.statInterval,
-               TempDir:      runner.parentTemp,
+               CID:          runner.executor.CgroupID(),
+               Logger:       log.New(runner.statLogger, "", 0),
+               MemThresholds: map[string][]crunchstat.Threshold{
+                       "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+               },
+               PollPeriod:      runner.statInterval,
+               TempDir:         runner.parentTemp,
+               ThresholdLogger: runner.CrunchLog,
        }
        runner.statReporter.Start()
        return nil
@@ -1145,6 +1153,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 
        if runner.statReporter != nil {
                runner.statReporter.Stop()
+               runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+                       "rss": runner.Container.RuntimeConstraints.RAM,
+               })
                err = runner.statLogger.Close()
                if err != nil {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
@@ -1288,9 +1299,13 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() {
                failures = 0
                if metadata != lastmetadata {
                        lastmetadata = metadata
-                       text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
                        runner.CrunchLog.Printf("%s", text)
-                       runner.updateRuntimeStatus("instance interruption", text)
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":          "preemption notice",
+                               "warningDetail":    text,
+                               "preemptionNotice": text,
+                       })
                        if proc, err := os.FindProcess(os.Getpid()); err == nil {
                                // trigger updateLogs
                                proc.Signal(syscall.SIGUSR1)
@@ -1299,13 +1314,10 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() {
        }
 }
 
-func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
        err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                "container": arvadosclient.Dict{
-                       "runtime_status": arvadosclient.Dict{
-                               "warning":       warning,
-                               "warningDetail": detail,
-                       },
+                       "runtime_status": status,
                },
        }, nil)
        if err != nil {
@@ -1631,11 +1643,7 @@ func (runner *ContainerRunner) Run() (err error) {
        signal.Notify(sigusr2, syscall.SIGUSR2)
        defer signal.Stop(sigusr2)
        runner.loadPrices()
-       go func() {
-               for range sigusr2 {
-                       runner.loadPrices()
-               }
-       }()
+       go runner.handleSIGUSR2(sigusr2)
 
        runner.finalState = "Queued"
 
@@ -2441,3 +2449,15 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
 
        return cost
 }
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+       for range sigchan {
+               runner.loadPrices()
+               update := arvadosclient.Dict{
+                       "container": arvadosclient.Dict{
+                               "cost": runner.calculateCost(time.Now()),
+                       },
+               }
+               runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+       }
+}