19986: Log max resource usage after a container finishes
authorBrett Smith <brett.smith@curii.com>
Wed, 1 Mar 2023 20:24:10 +0000 (15:24 -0500)
committerBrett Smith <brett.smith@curii.com>
Wed, 1 Mar 2023 20:24:23 +0000 (15:24 -0500)
Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/testdata/fakestat/cgroupid/memory.stat
lib/crunchstat/crunchstat.go
lib/crunchstat/crunchstat_test.go
lib/crunchstat/testdata/fakestat/memory.stat

index 3607cafaf0149bc9763c9fc5679ec6ebb6d0a2e1..3708be0c2417d5e603a671d1c2058517b4d0a807 100644 (file)
@@ -734,6 +734,7 @@ func (runner *ContainerRunner) stopHoststat() error {
                return nil
        }
        runner.hoststatReporter.Stop()
+       runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
        err := runner.hoststatLogger.Close()
        if err != nil {
                return fmt.Errorf("error closing hoststat logs: %v", err)
@@ -1152,6 +1153,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 
        if runner.statReporter != nil {
                runner.statReporter.Stop()
+               runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+                       "rss": runner.Container.RuntimeConstraints.RAM,
+               })
                err = runner.statLogger.Close()
                if err != nil {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
index 46a71e59297850b946809eea119916c55f97ed71..701be4517b631178924d2be2fbcdef5fdfd85139 100644 (file)
@@ -1025,6 +1025,33 @@ func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
        s.testLogRSSThresholds(c, 734003299, []int{90, 95, 99}, 0)
 }
 
+func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
+       s.runner.cgroupRoot = "testdata/fakestat"
+       s.runner.parentTemp = c.MkDir()
+       s.fullRunHelper(c, `{
+        "command": ["true"],
+        "container_image": "`+arvadostest.DockerImage112PDH+`",
+        "cwd": ".",
+        "environment": {},
+        "mounts": {"/tmp": {"kind": "tmp"} },
+        "output_path": "/tmp",
+        "priority": 1,
+        "runtime_constraints": {"ram": 7340032000},
+        "state": "Locked"
+    }`, nil, func() int { return 0 })
+       logs := s.api.Logs["crunch-run"].String()
+       for _, expected := range []string{
+               `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+               `Maximum container memory cache usage was 73400320 bytes`,
+               `Maximum container memory swap usage was 320 bytes`,
+               `Maximum container memory pgmajfault usage was 20 faults`,
+               `Maximum container memory rss usage was 10%, 734003200/7340032000 bytes`,
+               `Maximum crunch-run memory rss usage was \d+ bytes`,
+       } {
+               c.Check(logs, Matches, logLineStart+expected)
+       }
+}
+
 func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
        var collection_create, container_update arvadosclient.Dict
        s.fullRunHelper(c, `{
index fff3333c4fa8be8715d73d8a66c7cffc2d4698af..22f0e13fa96aed1904a787bef7589c10de1ecc1c 100644 (file)
@@ -1 +1,5 @@
 rss 734003200
+pgmajfault 3200
+total_cache 73400320
+total_pgmajfault 20
+total_swap 320
index 952e3975c22c810ec42e8ced812559da54f34e3d..4241f5fb5b4994629a78d3791fc7a467b37c152e 100644 (file)
@@ -23,6 +23,9 @@ import (
        "time"
 )
 
+// crunchstat collects all memory statistics, but only reports these.
+var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"}
+
 type logPrinter interface {
        Printf(fmt string, args ...interface{})
 }
@@ -70,6 +73,8 @@ type Reporter struct {
        lastCPUSample       cpuSample
        lastDiskSpaceSample diskSpaceSample
        lastMemSample       memSample
+       maxDiskSpaceSample  diskSpaceSample
+       maxMemSample        map[memoryKey]int64
 
        reportPIDs   map[string]int
        reportPIDsMu sync.Mutex
@@ -99,6 +104,15 @@ func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds
        return
 }
 
+// memoryKey is a key into Reporter.maxMemSample.
+// Initialize it with just statName to get the host/cgroup maximum.
+// Initialize it with all fields to get that process' maximum.
+type memoryKey struct {
+       processID   int
+       processName string
+       statName    string
+}
+
 // Start starts monitoring in a new goroutine, and returns
 // immediately.
 //
@@ -130,12 +144,68 @@ func (r *Reporter) ReportPID(name string, pid int) {
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
-// Nothing will be logged after Stop returns.
+// Nothing will be logged after Stop returns unless you call a Log* method.
 func (r *Reporter) Stop() {
        close(r.done)
        <-r.flushed
 }
 
+func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) {
+       var units string
+       switch statName {
+       case "pgmajfault":
+               units = "faults"
+       default:
+               units = "bytes"
+       }
+       if limit > 0 {
+               percentage := 100 * value / limit
+               logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s",
+                       source, statName, percentage, value, limit, units)
+       } else {
+               logger.Printf("Maximum %s memory %s usage was %d %s",
+                       source, statName, value, units)
+       }
+}
+
+func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) {
+       if r.lastCPUSample.hasData {
+               logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs",
+                       r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus)
+       }
+       for disk, sample := range r.lastDiskIOSample {
+               logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read",
+                       disk, sample.txBytes, sample.rxBytes)
+       }
+       if r.maxDiskSpaceSample.hasData {
+               percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total
+               logger.Printf("Maximum disk usage was %d%%, %d/%d bytes",
+                       percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total)
+       }
+       for _, statName := range memoryStats {
+               value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}]
+               if !ok {
+                       value, ok = r.maxMemSample[memoryKey{statName: statName}]
+               }
+               if ok {
+                       r.reportMemoryMax(logger, "container", statName, value, memLimits[statName])
+               }
+       }
+       for ifname, sample := range r.lastNetSample {
+               logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read",
+                       ifname, sample.txBytes, sample.rxBytes)
+       }
+}
+
+func (r *Reporter) LogProcessMemMax(logger logPrinter) {
+       for memKey, value := range r.maxMemSample {
+               if memKey.processName == "" {
+                       continue
+               }
+               r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0)
+       }
+}
+
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
@@ -293,6 +363,10 @@ func (r *Reporter) getMemSample() {
                        continue
                }
                thisSample.memStat[stat] = val
+               maxKey := memoryKey{statName: stat}
+               if val > r.maxMemSample[maxKey] {
+                       r.maxMemSample[maxKey] = val
+               }
        }
        r.lastMemSample = thisSample
 
@@ -325,8 +399,7 @@ func (r *Reporter) getMemSample() {
 
 func (r *Reporter) reportMemSample() {
        var outstat bytes.Buffer
-       wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
-       for _, key := range wantStats {
+       for _, key := range memoryStats {
                // Use "total_X" stats (entire hierarchy) if enabled,
                // otherwise just the single cgroup -- see
                // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
@@ -399,7 +472,12 @@ func (r *Reporter) doProcmemStats() {
                if err != nil {
                        continue
                }
-               procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+               value := rss * r.kernelPageSize
+               procmem += fmt.Sprintf(" %d %s", value, procname)
+               maxKey := memoryKey{pid, procname, "rss"}
+               if value > r.maxMemSample[maxKey] {
+                       r.maxMemSample[maxKey] = value
+               }
        }
        if procmem != "" {
                r.Logger.Printf("procmem%s\n", procmem)
@@ -472,6 +550,9 @@ func (r *Reporter) doDiskSpaceStats() {
                used:       (s.Blocks - s.Bfree) * bs,
                available:  s.Bavail * bs,
        }
+       if nextSample.used > r.maxDiskSpaceSample.used {
+               r.maxDiskSpaceSample = nextSample
+       }
 
        var delta string
        if r.lastDiskSpaceSample.hasData {
@@ -568,6 +649,7 @@ func (r *Reporter) doAllStats() {
 func (r *Reporter) run() {
        defer close(r.flushed)
 
+       r.maxMemSample = make(map[memoryKey]int64)
        r.reportedStatFile = make(map[string]string)
 
        if !r.waitForCIDFile() || !r.waitForCgroup() {
index d98fe10647b584671e0e5b779bf25300b687990e..88de12f076623f9ddde773ee1ae061c2bf041166 100644 (file)
@@ -6,8 +6,10 @@ package crunchstat
 
 import (
        "bytes"
+       "errors"
        "fmt"
        "os"
+       "path"
        "regexp"
        "strconv"
        "testing"
@@ -45,8 +47,9 @@ var _ = Suite(&suite{
 })
 
 type suite struct {
-       logbuf bytes.Buffer
-       logger *logrus.Logger
+       cgroupRoot string
+       logbuf     bytes.Buffer
+       logger     *logrus.Logger
 }
 
 func (s *suite) SetUpSuite(c *C) {
@@ -54,9 +57,49 @@ func (s *suite) SetUpSuite(c *C) {
 }
 
 func (s *suite) SetUpTest(c *C) {
+       s.cgroupRoot = ""
        s.logbuf.Reset()
 }
 
+func (s *suite) tempCgroup(c *C, sourceDir string) error {
+       tempDir := c.MkDir()
+       dirents, err := os.ReadDir(sourceDir)
+       if err != nil {
+               return err
+       }
+       for _, dirent := range dirents {
+               srcData, err := os.ReadFile(path.Join(sourceDir, dirent.Name()))
+               if err != nil {
+                       return err
+               }
+               destPath := path.Join(tempDir, dirent.Name())
+               err = os.WriteFile(destPath, srcData, 0o600)
+               if err != nil {
+                       return err
+               }
+       }
+       s.cgroupRoot = tempDir
+       return nil
+}
+
+func (s *suite) addPidToCgroup(pid int) error {
+       if s.cgroupRoot == "" {
+               return errors.New("cgroup has not been set up for this test")
+       }
+       procsPath := path.Join(s.cgroupRoot, "cgroup.procs")
+       procsFile, err := os.OpenFile(procsPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
+       if err != nil {
+               return err
+       }
+       pidLine := strconv.Itoa(pid) + "\n"
+       _, err = procsFile.Write([]byte(pidLine))
+       if err != nil {
+               procsFile.Close()
+               return err
+       }
+       return procsFile.Close()
+}
+
 func (s *suite) TestReadAllOrWarnFail(c *C) {
        rep := Reporter{Logger: s.logger}
 
@@ -163,3 +206,56 @@ func (s *suite) TestMultipleRSSThresholdsSomePassed(c *C) {
 func (s *suite) TestMultipleRSSThresholdsAllPassed(c *C) {
        s.testRSSThresholds(c, []int64{1, 2, 3}, 3)
 }
+
+func (s *suite) TestLogMaxima(c *C) {
+       err := s.tempCgroup(c, fakeRSS.cgroupRoot)
+       c.Assert(err, IsNil)
+       rep := Reporter{
+               CgroupRoot: s.cgroupRoot,
+               Logger:     s.logger,
+               PollPeriod: time.Second * 10,
+               TempDir:    s.cgroupRoot,
+       }
+       rep.Start()
+       rep.Stop()
+       rep.LogMaxima(s.logger, map[string]int64{"rss": GiB})
+       logs := s.logbuf.String()
+       c.Logf("%s", logs)
+
+       expectRSS := fmt.Sprintf(`Maximum container memory rss usage was %d%%, %d/%d bytes`,
+               100*fakeRSS.value/GiB, fakeRSS.value, GiB)
+       for _, expected := range []string{
+               `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+               `Maximum container memory cache usage was 73400320 bytes`,
+               `Maximum container memory swap usage was 320 bytes`,
+               `Maximum container memory pgmajfault usage was 20 faults`,
+               expectRSS,
+       } {
+               pattern := logMsgPrefix + expected + `"`
+               c.Check(logs, Matches, pattern)
+       }
+}
+
+func (s *suite) TestLogProcessMemMax(c *C) {
+       err := s.tempCgroup(c, fakeRSS.cgroupRoot)
+       c.Assert(err, IsNil)
+       pid := os.Getpid()
+       err = s.addPidToCgroup(pid)
+       c.Assert(err, IsNil)
+
+       rep := Reporter{
+               CgroupRoot: s.cgroupRoot,
+               Logger:     s.logger,
+               PollPeriod: time.Second * 10,
+               TempDir:    s.cgroupRoot,
+       }
+       rep.ReportPID("test-run", pid)
+       rep.Start()
+       rep.Stop()
+       rep.LogProcessMemMax(s.logger)
+       logs := s.logbuf.String()
+       c.Logf("%s", logs)
+
+       pattern := logMsgPrefix + `Maximum test-run memory rss usage was \d+ bytes"`
+       c.Check(logs, Matches, pattern)
+}
index 0540eea230491a781b12908c198c49a66aaa5f9b..f1245211dea196e086d0bee44ce868ea536e176e 100644 (file)
@@ -1,2 +1,6 @@
 rss 990
 total_rss 786432000
+pgmajfault 3200
+total_cache 73400320
+total_pgmajfault 20
+total_swap 320