s.testLogRSSThresholds(c, 734003299, []int{90, 95, 99}, 0)
}
+func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
+ s.runner.cgroupRoot = "testdata/fakestat"
+ s.runner.parentTemp = c.MkDir()
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"ram": 7340032000},
+ "state": "Locked"
+ }`, nil, func() int { return 0 })
+ logs := s.api.Logs["crunch-run"].String()
+ for _, expected := range []string{
+ `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+ `Maximum container memory cache usage was 73400320 bytes`,
+ `Maximum container memory swap usage was 320 bytes`,
+ `Maximum container memory pgmajfault usage was 20 faults`,
+ `Maximum container memory rss usage was 10%, 734003200/7340032000 bytes`,
+ `Maximum crunch-run memory rss usage was \d+ bytes`,
+ } {
+ c.Check(logs, Matches, logLineStart+expected)
+ }
+}
+
func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
var collection_create, container_update arvadosclient.Dict
s.fullRunHelper(c, `{
"time"
)
+// crunchstat collects all memory statistics, but only reports these.
+var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"}
+
type logPrinter interface {
Printf(fmt string, args ...interface{})
}
lastCPUSample cpuSample
lastDiskSpaceSample diskSpaceSample
lastMemSample memSample
+ maxDiskSpaceSample diskSpaceSample
+ maxMemSample map[memoryKey]int64
reportPIDs map[string]int
reportPIDsMu sync.Mutex
return
}
+// memoryKey is a key into Reporter.maxMemSample.
+// Initialize it with just statName to get the host/cgroup maximum.
+// Initialize it with all fields to get that process' maximum.
+type memoryKey struct {
+ processID int
+ processName string
+ statName string
+}
+
// Start starts monitoring in a new goroutine, and returns
// immediately.
//
// Stop reporting. Do not call more than once, or before calling
// Start.
//
-// Nothing will be logged after Stop returns.
+// Nothing will be logged after Stop returns unless you call a Log* method.
func (r *Reporter) Stop() {
close(r.done)
<-r.flushed
}
+func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) {
+ var units string
+ switch statName {
+ case "pgmajfault":
+ units = "faults"
+ default:
+ units = "bytes"
+ }
+ if limit > 0 {
+ percentage := 100 * value / limit
+ logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s",
+ source, statName, percentage, value, limit, units)
+ } else {
+ logger.Printf("Maximum %s memory %s usage was %d %s",
+ source, statName, value, units)
+ }
+}
+
+func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) {
+ if r.lastCPUSample.hasData {
+ logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs",
+ r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus)
+ }
+ for disk, sample := range r.lastDiskIOSample {
+ logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read",
+ disk, sample.txBytes, sample.rxBytes)
+ }
+ if r.maxDiskSpaceSample.hasData {
+ percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total
+ logger.Printf("Maximum disk usage was %d%%, %d/%d bytes",
+ percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total)
+ }
+ for _, statName := range memoryStats {
+ value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}]
+ if !ok {
+ value, ok = r.maxMemSample[memoryKey{statName: statName}]
+ }
+ if ok {
+ r.reportMemoryMax(logger, "container", statName, value, memLimits[statName])
+ }
+ }
+ for ifname, sample := range r.lastNetSample {
+ logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read",
+ ifname, sample.txBytes, sample.rxBytes)
+ }
+}
+
+func (r *Reporter) LogProcessMemMax(logger logPrinter) {
+ for memKey, value := range r.maxMemSample {
+ if memKey.processName == "" {
+ continue
+ }
+ r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0)
+ }
+}
+
func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
continue
}
thisSample.memStat[stat] = val
+ maxKey := memoryKey{statName: stat}
+ if val > r.maxMemSample[maxKey] {
+ r.maxMemSample[maxKey] = val
+ }
}
r.lastMemSample = thisSample
func (r *Reporter) reportMemSample() {
var outstat bytes.Buffer
- wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
- for _, key := range wantStats {
+ for _, key := range memoryStats {
// Use "total_X" stats (entire hierarchy) if enabled,
// otherwise just the single cgroup -- see
// https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
if err != nil {
continue
}
- procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+ value := rss * r.kernelPageSize
+ procmem += fmt.Sprintf(" %d %s", value, procname)
+ maxKey := memoryKey{pid, procname, "rss"}
+ if value > r.maxMemSample[maxKey] {
+ r.maxMemSample[maxKey] = value
+ }
}
if procmem != "" {
r.Logger.Printf("procmem%s\n", procmem)
used: (s.Blocks - s.Bfree) * bs,
available: s.Bavail * bs,
}
+ if nextSample.used > r.maxDiskSpaceSample.used {
+ r.maxDiskSpaceSample = nextSample
+ }
var delta string
if r.lastDiskSpaceSample.hasData {
func (r *Reporter) run() {
defer close(r.flushed)
+ r.maxMemSample = make(map[memoryKey]int64)
r.reportedStatFile = make(map[string]string)
if !r.waitForCIDFile() || !r.waitForCgroup() {
import (
"bytes"
+ "errors"
"fmt"
"os"
+ "path"
"regexp"
"strconv"
"testing"
})
type suite struct {
- logbuf bytes.Buffer
- logger *logrus.Logger
+ cgroupRoot string
+ logbuf bytes.Buffer
+ logger *logrus.Logger
}
func (s *suite) SetUpSuite(c *C) {
}
func (s *suite) SetUpTest(c *C) {
+ s.cgroupRoot = ""
s.logbuf.Reset()
}
+func (s *suite) tempCgroup(c *C, sourceDir string) error {
+ tempDir := c.MkDir()
+ dirents, err := os.ReadDir(sourceDir)
+ if err != nil {
+ return err
+ }
+ for _, dirent := range dirents {
+ srcData, err := os.ReadFile(path.Join(sourceDir, dirent.Name()))
+ if err != nil {
+ return err
+ }
+ destPath := path.Join(tempDir, dirent.Name())
+ err = os.WriteFile(destPath, srcData, 0o600)
+ if err != nil {
+ return err
+ }
+ }
+ s.cgroupRoot = tempDir
+ return nil
+}
+
+func (s *suite) addPidToCgroup(pid int) error {
+ if s.cgroupRoot == "" {
+ return errors.New("cgroup has not been set up for this test")
+ }
+ procsPath := path.Join(s.cgroupRoot, "cgroup.procs")
+ procsFile, err := os.OpenFile(procsPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
+ if err != nil {
+ return err
+ }
+ pidLine := strconv.Itoa(pid) + "\n"
+ _, err = procsFile.Write([]byte(pidLine))
+ if err != nil {
+ procsFile.Close()
+ return err
+ }
+ return procsFile.Close()
+}
+
func (s *suite) TestReadAllOrWarnFail(c *C) {
rep := Reporter{Logger: s.logger}
func (s *suite) TestMultipleRSSThresholdsAllPassed(c *C) {
s.testRSSThresholds(c, []int64{1, 2, 3}, 3)
}
+
+func (s *suite) TestLogMaxima(c *C) {
+ err := s.tempCgroup(c, fakeRSS.cgroupRoot)
+ c.Assert(err, IsNil)
+ rep := Reporter{
+ CgroupRoot: s.cgroupRoot,
+ Logger: s.logger,
+ PollPeriod: time.Second * 10,
+ TempDir: s.cgroupRoot,
+ }
+ rep.Start()
+ rep.Stop()
+ rep.LogMaxima(s.logger, map[string]int64{"rss": GiB})
+ logs := s.logbuf.String()
+ c.Logf("%s", logs)
+
+ expectRSS := fmt.Sprintf(`Maximum container memory rss usage was %d%%, %d/%d bytes`,
+ 100*fakeRSS.value/GiB, fakeRSS.value, GiB)
+ for _, expected := range []string{
+ `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+ `Maximum container memory cache usage was 73400320 bytes`,
+ `Maximum container memory swap usage was 320 bytes`,
+ `Maximum container memory pgmajfault usage was 20 faults`,
+ expectRSS,
+ } {
+ pattern := logMsgPrefix + expected + `"`
+ c.Check(logs, Matches, pattern)
+ }
+}
+
+func (s *suite) TestLogProcessMemMax(c *C) {
+ err := s.tempCgroup(c, fakeRSS.cgroupRoot)
+ c.Assert(err, IsNil)
+ pid := os.Getpid()
+ err = s.addPidToCgroup(pid)
+ c.Assert(err, IsNil)
+
+ rep := Reporter{
+ CgroupRoot: s.cgroupRoot,
+ Logger: s.logger,
+ PollPeriod: time.Second * 10,
+ TempDir: s.cgroupRoot,
+ }
+ rep.ReportPID("test-run", pid)
+ rep.Start()
+ rep.Stop()
+ rep.LogProcessMemMax(s.logger)
+ logs := s.logbuf.String()
+ c.Logf("%s", logs)
+
+ pattern := logMsgPrefix + `Maximum test-run memory rss usage was \d+ bytes"`
+ c.Check(logs, Matches, pattern)
+}