20561: crunch-run logs files/directories propagated from keep
[arvados.git] / lib / crunchrun / crunchrun.go
index 57eed84bacb11a9759e3e29b73f5b45e84d2be37..4a514f3d8966ecbdbc3e43f1a526f9427b99b7b6 100644 (file)
@@ -734,6 +734,7 @@ func (runner *ContainerRunner) stopHoststat() error {
                return nil
        }
        runner.hoststatReporter.Stop()
+       runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
        err := runner.hoststatLogger.Close()
        if err != nil {
                return fmt.Errorf("error closing hoststat logs: %v", err)
@@ -764,12 +765,16 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.executor.CgroupID(),
-               Logger:       log.New(runner.statLogger, "", 0),
                CgroupParent: runner.expectCgroupParent,
                CgroupRoot:   runner.cgroupRoot,
-               PollPeriod:   runner.statInterval,
-               TempDir:      runner.parentTemp,
+               CID:          runner.executor.CgroupID(),
+               Logger:       log.New(runner.statLogger, "", 0),
+               MemThresholds: map[string][]crunchstat.Threshold{
+                       "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+               },
+               PollPeriod:      runner.statInterval,
+               TempDir:         runner.parentTemp,
+               ThresholdLogger: runner.CrunchLog,
        }
        runner.statReporter.Start()
        return nil
@@ -1148,6 +1153,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 
        if runner.statReporter != nil {
                runner.statReporter.Stop()
+               runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+                       "rss": runner.Container.RuntimeConstraints.RAM,
+               })
                err = runner.statLogger.Close()
                if err != nil {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
@@ -1291,7 +1299,7 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() {
                failures = 0
                if metadata != lastmetadata {
                        lastmetadata = metadata
-                       text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
                        runner.CrunchLog.Printf("%s", text)
                        runner.updateRuntimeStatus(arvadosclient.Dict{
                                "warning":          "preemption notice",
@@ -1513,7 +1521,13 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
        if final {
                updates["is_trashed"] = true
        } else {
-               exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+               // We set trash_at so this collection gets
+               // automatically cleaned up eventually.  It used to be
+               // 12 hours but we had a situation where the API
+               // server was down over a weekend but the containers
+               // kept running such that the log collection got
+               // trashed, so now we make it 2 weeks.  refs #20378
+               exp := time.Now().Add(time.Duration(24*14) * time.Hour)
                updates["trash_at"] = exp
                updates["delete_at"] = exp
        }
@@ -1635,11 +1649,7 @@ func (runner *ContainerRunner) Run() (err error) {
        signal.Notify(sigusr2, syscall.SIGUSR2)
        defer signal.Stop(sigusr2)
        runner.loadPrices()
-       go func() {
-               for range sigusr2 {
-                       runner.loadPrices()
-               }
-       }()
+       go runner.handleSIGUSR2(sigusr2)
 
        runner.finalState = "Queued"
 
@@ -2073,6 +2083,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        ContainerUUID: containerUUID,
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
+                       LogCollection: cr.LogCollection,
                }
                if gwListen == "" {
                        // Direct connection won't work, so we use the
@@ -2445,3 +2456,15 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
 
        return cost
 }
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+       for range sigchan {
+               runner.loadPrices()
+               update := arvadosclient.Dict{
+                       "container": arvadosclient.Dict{
+                               "cost": runner.calculateCost(time.Now()),
+                       },
+               }
+               runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+       }
+}