X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/08078f621c8dbc1ecbd6e030bb0fac848cb6a01c..cc1c83c261d289c7fa049637f8ae1fabe352059c:/lib/crunchstat/crunchstat.go?ds=sidebyside diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index 3a473cab87..ad1cc7a97a 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -23,6 +23,13 @@ import ( "time" ) +// crunchstat collects all memory statistics, but only reports these. +var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"} + +type logPrinter interface { + Printf(fmt string, args ...interface{}) +} + // A Reporter gathers statistics for a cgroup and writes them to a // log.Logger. type Reporter struct { @@ -49,9 +56,15 @@ type Reporter struct { TempDir string // Where to write statistics. Must not be nil. - Logger interface { - Printf(fmt string, args ...interface{}) - } + Logger logPrinter + + // When stats cross thresholds configured in the fields below, + // they are reported to this logger. + ThresholdLogger logPrinter + + // MemThresholds maps memory stat names to slices of thresholds. + // When the corresponding stat exceeds a threshold, that will be logged. + MemThresholds map[string][]Threshold kernelPageSize int64 reportedStatFile map[string]string @@ -59,6 +72,9 @@ type Reporter struct { lastDiskIOSample map[string]ioSample lastCPUSample cpuSample lastDiskSpaceSample diskSpaceSample + lastMemSample memSample + maxDiskSpaceSample diskSpaceSample + maxMemSample map[memoryKey]int64 reportPIDs map[string]int reportPIDsMu sync.Mutex @@ -67,6 +83,36 @@ type Reporter struct { flushed chan struct{} // closed when we have made our last report } +type Threshold struct { + percentage int64 + threshold int64 + total int64 +} + +func NewThresholdFromPercentage(total int64, percentage int64) Threshold { + return Threshold{ + percentage: percentage, + threshold: total * percentage / 100, + total: total, + } +} + +func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) { + for _, percentage := range percentages { + thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage)) + } + return +} + +// memoryKey is a key into Reporter.maxMemSample. +// Initialize it with just statName to get the host/cgroup maximum. +// Initialize it with all fields to get that process' maximum. +type memoryKey struct { + processID int + processName string + statName string +} + // Start starts monitoring in a new goroutine, and returns // immediately. // @@ -98,12 +144,68 @@ func (r *Reporter) ReportPID(name string, pid int) { // Stop reporting. Do not call more than once, or before calling // Start. // -// Nothing will be logged after Stop returns. +// Nothing will be logged after Stop returns unless you call a Log* method. func (r *Reporter) Stop() { close(r.done) <-r.flushed } +func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) { + var units string + switch statName { + case "pgmajfault": + units = "faults" + default: + units = "bytes" + } + if limit > 0 { + percentage := 100 * value / limit + logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s", + source, statName, percentage, value, limit, units) + } else { + logger.Printf("Maximum %s memory %s usage was %d %s", + source, statName, value, units) + } +} + +func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) { + if r.lastCPUSample.hasData { + logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs", + r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus) + } + for disk, sample := range r.lastDiskIOSample { + logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read", + disk, sample.txBytes, sample.rxBytes) + } + if r.maxDiskSpaceSample.total > 0 { + percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total + logger.Printf("Maximum disk usage was %d%%, %d/%d bytes", + percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total) + } + for _, statName := range memoryStats { + value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}] + if !ok { + value, ok = r.maxMemSample[memoryKey{statName: statName}] + } + if ok { + r.reportMemoryMax(logger, "container", statName, value, memLimits[statName]) + } + } + for ifname, sample := range r.lastNetSample { + logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read", + ifname, sample.txBytes, sample.rxBytes) + } +} + +func (r *Reporter) LogProcessMemMax(logger logPrinter) { + for memKey, value := range r.maxMemSample { + if memKey.processName == "" { + continue + } + r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0) + } +} + func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) { content, err := ioutil.ReadAll(in) if err != nil { @@ -246,7 +348,7 @@ type memSample struct { memStat map[string]int64 } -func (r *Reporter) doMemoryStats() { +func (r *Reporter) getMemSample() { c, err := r.openStatFile("memory", "memory.stat", true) if err != nil { return @@ -254,7 +356,6 @@ func (r *Reporter) doMemoryStats() { defer c.Close() b := bufio.NewScanner(c) thisSample := memSample{time.Now(), make(map[string]int64)} - wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"} for b.Scan() { var stat string var val int64 @@ -262,20 +363,56 @@ func (r *Reporter) doMemoryStats() { continue } thisSample.memStat[stat] = val + maxKey := memoryKey{statName: stat} + if val > r.maxMemSample[maxKey] { + r.maxMemSample[maxKey] = val + } } + r.lastMemSample = thisSample + + if r.ThresholdLogger != nil { + for statName, thresholds := range r.MemThresholds { + statValue, ok := thisSample.memStat["total_"+statName] + if !ok { + statValue, ok = thisSample.memStat[statName] + if !ok { + continue + } + } + var index int + var statThreshold Threshold + for index, statThreshold = range thresholds { + if statValue < statThreshold.threshold { + break + } else if statThreshold.percentage > 0 { + r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)", + statThreshold.percentage, statName, statValue, statThreshold.total) + } else { + r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)", + statThreshold.threshold, statName, statValue) + } + } + r.MemThresholds[statName] = thresholds[index:] + } + } +} + +func (r *Reporter) reportMemSample() { var outstat bytes.Buffer - for _, key := range wantStats { + for _, key := range memoryStats { // Use "total_X" stats (entire hierarchy) if enabled, // otherwise just the single cgroup -- see // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt - if val, ok := thisSample.memStat["total_"+key]; ok { + if val, ok := r.lastMemSample.memStat["total_"+key]; ok { fmt.Fprintf(&outstat, " %d %s", val, key) - } else if val, ok := thisSample.memStat[key]; ok { + } else if val, ok := r.lastMemSample.memStat[key]; ok { fmt.Fprintf(&outstat, " %d %s", val, key) } } r.Logger.Printf("mem%s\n", outstat.String()) +} +func (r *Reporter) doProcmemStats() { if r.kernelPageSize == 0 { // assign "don't try again" value in case we give up // and return without assigning the real value @@ -335,7 +472,12 @@ func (r *Reporter) doMemoryStats() { if err != nil { continue } - procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname) + value := rss * r.kernelPageSize + procmem += fmt.Sprintf(" %d %s", value, procname) + maxKey := memoryKey{pid, procname, "rss"} + if value > r.maxMemSample[maxKey] { + r.maxMemSample[maxKey] = value + } } if procmem != "" { r.Logger.Printf("procmem%s\n", procmem) @@ -408,6 +550,9 @@ func (r *Reporter) doDiskSpaceStats() { used: (s.Blocks - s.Bfree) * bs, available: s.Bavail * bs, } + if nextSample.used > r.maxDiskSpaceSample.used { + r.maxDiskSpaceSample = nextSample + } var delta string if r.lastDiskSpaceSample.hasData { @@ -490,11 +635,21 @@ func (r *Reporter) doCPUStats() { r.lastCPUSample = nextSample } +func (r *Reporter) doAllStats() { + r.reportMemSample() + r.doProcmemStats() + r.doCPUStats() + r.doBlkIOStats() + r.doNetworkStats() + r.doDiskSpaceStats() +} + // Report stats periodically until we learn (via r.done) that someone // called Stop. func (r *Reporter) run() { defer close(r.flushed) + r.maxMemSample = make(map[memoryKey]int64) r.reportedStatFile = make(map[string]string) if !r.waitForCIDFile() || !r.waitForCgroup() { @@ -512,17 +667,19 @@ func (r *Reporter) run() { r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir) } - ticker := time.NewTicker(r.PollPeriod) + r.getMemSample() + r.doAllStats() + + memTicker := time.NewTicker(time.Second) + mainTicker := time.NewTicker(r.PollPeriod) for { - r.doMemoryStats() - r.doCPUStats() - r.doBlkIOStats() - r.doNetworkStats() - r.doDiskSpaceStats() select { case <-r.done: return - case <-ticker.C: + case <-memTicker.C: + r.getMemSample() + case <-mainTicker.C: + r.doAllStats() } } }