X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1b92f8cec11d04b5f9464c6ad100f579660f70b9..32c63d03d45d231768eb1497dfc5e9f4a0d23c16:/lib/crunchstat/crunchstat.go diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index 5d2059f7e0..ad1cc7a97a 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -23,6 +23,13 @@ import ( "time" ) +// crunchstat collects all memory statistics, but only reports these. +var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"} + +type logPrinter interface { + Printf(fmt string, args ...interface{}) +} + // A Reporter gathers statistics for a cgroup and writes them to a // log.Logger. type Reporter struct { @@ -49,9 +56,15 @@ type Reporter struct { TempDir string // Where to write statistics. Must not be nil. - Logger interface { - Printf(fmt string, args ...interface{}) - } + Logger logPrinter + + // When stats cross thresholds configured in the fields below, + // they are reported to this logger. + ThresholdLogger logPrinter + + // MemThresholds maps memory stat names to slices of thresholds. + // When the corresponding stat exceeds a threshold, that will be logged. + MemThresholds map[string][]Threshold kernelPageSize int64 reportedStatFile map[string]string @@ -60,6 +73,8 @@ type Reporter struct { lastCPUSample cpuSample lastDiskSpaceSample diskSpaceSample lastMemSample memSample + maxDiskSpaceSample diskSpaceSample + maxMemSample map[memoryKey]int64 reportPIDs map[string]int reportPIDsMu sync.Mutex @@ -68,6 +83,36 @@ type Reporter struct { flushed chan struct{} // closed when we have made our last report } +type Threshold struct { + percentage int64 + threshold int64 + total int64 +} + +func NewThresholdFromPercentage(total int64, percentage int64) Threshold { + return Threshold{ + percentage: percentage, + threshold: total * percentage / 100, + total: total, + } +} + +func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) { + for _, percentage := range percentages { + thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage)) + } + return +} + +// memoryKey is a key into Reporter.maxMemSample. +// Initialize it with just statName to get the host/cgroup maximum. +// Initialize it with all fields to get that process' maximum. +type memoryKey struct { + processID int + processName string + statName string +} + // Start starts monitoring in a new goroutine, and returns // immediately. // @@ -99,12 +144,68 @@ func (r *Reporter) ReportPID(name string, pid int) { // Stop reporting. Do not call more than once, or before calling // Start. // -// Nothing will be logged after Stop returns. +// Nothing will be logged after Stop returns unless you call a Log* method. func (r *Reporter) Stop() { close(r.done) <-r.flushed } +func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) { + var units string + switch statName { + case "pgmajfault": + units = "faults" + default: + units = "bytes" + } + if limit > 0 { + percentage := 100 * value / limit + logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s", + source, statName, percentage, value, limit, units) + } else { + logger.Printf("Maximum %s memory %s usage was %d %s", + source, statName, value, units) + } +} + +func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) { + if r.lastCPUSample.hasData { + logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs", + r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus) + } + for disk, sample := range r.lastDiskIOSample { + logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read", + disk, sample.txBytes, sample.rxBytes) + } + if r.maxDiskSpaceSample.total > 0 { + percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total + logger.Printf("Maximum disk usage was %d%%, %d/%d bytes", + percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total) + } + for _, statName := range memoryStats { + value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}] + if !ok { + value, ok = r.maxMemSample[memoryKey{statName: statName}] + } + if ok { + r.reportMemoryMax(logger, "container", statName, value, memLimits[statName]) + } + } + for ifname, sample := range r.lastNetSample { + logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read", + ifname, sample.txBytes, sample.rxBytes) + } +} + +func (r *Reporter) LogProcessMemMax(logger logPrinter) { + for memKey, value := range r.maxMemSample { + if memKey.processName == "" { + continue + } + r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0) + } +} + func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) { content, err := ioutil.ReadAll(in) if err != nil { @@ -262,14 +363,43 @@ func (r *Reporter) getMemSample() { continue } thisSample.memStat[stat] = val + maxKey := memoryKey{statName: stat} + if val > r.maxMemSample[maxKey] { + r.maxMemSample[maxKey] = val + } } r.lastMemSample = thisSample + + if r.ThresholdLogger != nil { + for statName, thresholds := range r.MemThresholds { + statValue, ok := thisSample.memStat["total_"+statName] + if !ok { + statValue, ok = thisSample.memStat[statName] + if !ok { + continue + } + } + var index int + var statThreshold Threshold + for index, statThreshold = range thresholds { + if statValue < statThreshold.threshold { + break + } else if statThreshold.percentage > 0 { + r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)", + statThreshold.percentage, statName, statValue, statThreshold.total) + } else { + r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)", + statThreshold.threshold, statName, statValue) + } + } + r.MemThresholds[statName] = thresholds[index:] + } + } } func (r *Reporter) reportMemSample() { var outstat bytes.Buffer - wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"} - for _, key := range wantStats { + for _, key := range memoryStats { // Use "total_X" stats (entire hierarchy) if enabled, // otherwise just the single cgroup -- see // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt @@ -342,7 +472,12 @@ func (r *Reporter) doProcmemStats() { if err != nil { continue } - procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname) + value := rss * r.kernelPageSize + procmem += fmt.Sprintf(" %d %s", value, procname) + maxKey := memoryKey{pid, procname, "rss"} + if value > r.maxMemSample[maxKey] { + r.maxMemSample[maxKey] = value + } } if procmem != "" { r.Logger.Printf("procmem%s\n", procmem) @@ -415,6 +550,9 @@ func (r *Reporter) doDiskSpaceStats() { used: (s.Blocks - s.Bfree) * bs, available: s.Bavail * bs, } + if nextSample.used > r.maxDiskSpaceSample.used { + r.maxDiskSpaceSample = nextSample + } var delta string if r.lastDiskSpaceSample.hasData { @@ -511,6 +649,7 @@ func (r *Reporter) doAllStats() { func (r *Reporter) run() { defer close(r.flushed) + r.maxMemSample = make(map[memoryKey]int64) r.reportedStatFile = make(map[string]string) if !r.waitForCIDFile() || !r.waitForCgroup() {