Merge branch '20182-supervisor-limit' refs #20182
[arvados.git] / lib / crunchstat / crunchstat.go
index 10cd7cfce43a03472e2e942b68512efcdd7d0c61..ad1cc7a97a47eba4423bd3be704b0c21807fb62b 100644 (file)
@@ -13,14 +13,23 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
+       "regexp"
+       "sort"
        "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
 
+// crunchstat collects all memory statistics, but only reports these.
+var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"}
+
+type logPrinter interface {
+       Printf(fmt string, args ...interface{})
+}
+
 // A Reporter gathers statistics for a cgroup and writes them to a
 // log.Logger.
 type Reporter struct {
@@ -47,18 +56,63 @@ type Reporter struct {
        TempDir string
 
        // Where to write statistics. Must not be nil.
-       Logger *log.Logger
+       Logger logPrinter
+
+       // When stats cross thresholds configured in the fields below,
+       // they are reported to this logger.
+       ThresholdLogger logPrinter
+
+       // MemThresholds maps memory stat names to slices of thresholds.
+       // When the corresponding stat exceeds a threshold, that will be logged.
+       MemThresholds map[string][]Threshold
 
+       kernelPageSize      int64
        reportedStatFile    map[string]string
        lastNetSample       map[string]ioSample
        lastDiskIOSample    map[string]ioSample
        lastCPUSample       cpuSample
        lastDiskSpaceSample diskSpaceSample
+       lastMemSample       memSample
+       maxDiskSpaceSample  diskSpaceSample
+       maxMemSample        map[memoryKey]int64
+
+       reportPIDs   map[string]int
+       reportPIDsMu sync.Mutex
 
        done    chan struct{} // closed when we should stop reporting
        flushed chan struct{} // closed when we have made our last report
 }
 
+type Threshold struct {
+       percentage int64
+       threshold  int64
+       total      int64
+}
+
+func NewThresholdFromPercentage(total int64, percentage int64) Threshold {
+       return Threshold{
+               percentage: percentage,
+               threshold:  total * percentage / 100,
+               total:      total,
+       }
+}
+
+func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) {
+       for _, percentage := range percentages {
+               thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage))
+       }
+       return
+}
+
+// memoryKey is a key into Reporter.maxMemSample.
+// Initialize it with just statName to get the host/cgroup maximum.
+// Initialize it with all fields to get that process' maximum.
+type memoryKey struct {
+       processID   int
+       processName string
+       statName    string
+}
+
 // Start starts monitoring in a new goroutine, and returns
 // immediately.
 //
@@ -76,15 +130,82 @@ func (r *Reporter) Start() {
        go r.run()
 }
 
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       if r.reportPIDs == nil {
+               r.reportPIDs = map[string]int{name: pid}
+       } else {
+               r.reportPIDs[name] = pid
+       }
+}
+
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
-// Nothing will be logged after Stop returns.
+// Nothing will be logged after Stop returns unless you call a Log* method.
 func (r *Reporter) Stop() {
        close(r.done)
        <-r.flushed
 }
 
+func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) {
+       var units string
+       switch statName {
+       case "pgmajfault":
+               units = "faults"
+       default:
+               units = "bytes"
+       }
+       if limit > 0 {
+               percentage := 100 * value / limit
+               logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s",
+                       source, statName, percentage, value, limit, units)
+       } else {
+               logger.Printf("Maximum %s memory %s usage was %d %s",
+                       source, statName, value, units)
+       }
+}
+
+func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) {
+       if r.lastCPUSample.hasData {
+               logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs",
+                       r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus)
+       }
+       for disk, sample := range r.lastDiskIOSample {
+               logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read",
+                       disk, sample.txBytes, sample.rxBytes)
+       }
+       if r.maxDiskSpaceSample.total > 0 {
+               percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total
+               logger.Printf("Maximum disk usage was %d%%, %d/%d bytes",
+                       percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total)
+       }
+       for _, statName := range memoryStats {
+               value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}]
+               if !ok {
+                       value, ok = r.maxMemSample[memoryKey{statName: statName}]
+               }
+               if ok {
+                       r.reportMemoryMax(logger, "container", statName, value, memLimits[statName])
+               }
+       }
+       for ifname, sample := range r.lastNetSample {
+               logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read",
+                       ifname, sample.txBytes, sample.rxBytes)
+       }
+}
+
+func (r *Reporter) LogProcessMemMax(logger logPrinter) {
+       for memKey, value := range r.maxMemSample {
+               if memKey.processName == "" {
+                       continue
+               }
+               r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0)
+       }
+}
+
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
@@ -227,7 +348,7 @@ type memSample struct {
        memStat    map[string]int64
 }
 
-func (r *Reporter) doMemoryStats() {
+func (r *Reporter) getMemSample() {
        c, err := r.openStatFile("memory", "memory.stat", true)
        if err != nil {
                return
@@ -235,7 +356,6 @@ func (r *Reporter) doMemoryStats() {
        defer c.Close()
        b := bufio.NewScanner(c)
        thisSample := memSample{time.Now(), make(map[string]int64)}
-       wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
        for b.Scan() {
                var stat string
                var val int64
@@ -243,21 +363,127 @@ func (r *Reporter) doMemoryStats() {
                        continue
                }
                thisSample.memStat[stat] = val
+               maxKey := memoryKey{statName: stat}
+               if val > r.maxMemSample[maxKey] {
+                       r.maxMemSample[maxKey] = val
+               }
        }
+       r.lastMemSample = thisSample
+
+       if r.ThresholdLogger != nil {
+               for statName, thresholds := range r.MemThresholds {
+                       statValue, ok := thisSample.memStat["total_"+statName]
+                       if !ok {
+                               statValue, ok = thisSample.memStat[statName]
+                               if !ok {
+                                       continue
+                               }
+                       }
+                       var index int
+                       var statThreshold Threshold
+                       for index, statThreshold = range thresholds {
+                               if statValue < statThreshold.threshold {
+                                       break
+                               } else if statThreshold.percentage > 0 {
+                                       r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)",
+                                               statThreshold.percentage, statName, statValue, statThreshold.total)
+                               } else {
+                                       r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)",
+                                               statThreshold.threshold, statName, statValue)
+                               }
+                       }
+                       r.MemThresholds[statName] = thresholds[index:]
+               }
+       }
+}
+
+func (r *Reporter) reportMemSample() {
        var outstat bytes.Buffer
-       for _, key := range wantStats {
+       for _, key := range memoryStats {
                // Use "total_X" stats (entire hierarchy) if enabled,
                // otherwise just the single cgroup -- see
                // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
-               if val, ok := thisSample.memStat["total_"+key]; ok {
+               if val, ok := r.lastMemSample.memStat["total_"+key]; ok {
                        fmt.Fprintf(&outstat, " %d %s", val, key)
-               } else if val, ok := thisSample.memStat[key]; ok {
+               } else if val, ok := r.lastMemSample.memStat[key]; ok {
                        fmt.Fprintf(&outstat, " %d %s", val, key)
                }
        }
        r.Logger.Printf("mem%s\n", outstat.String())
 }
 
+func (r *Reporter) doProcmemStats() {
+       if r.kernelPageSize == 0 {
+               // assign "don't try again" value in case we give up
+               // and return without assigning the real value
+               r.kernelPageSize = -1
+               buf, err := os.ReadFile("/proc/self/smaps")
+               if err != nil {
+                       r.Logger.Printf("error reading /proc/self/smaps: %s", err)
+                       return
+               }
+               m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+               if len(m) != 2 {
+                       r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
+                       return
+               }
+               size, err := strconv.ParseInt(string(m[1]), 10, 64)
+               if err != nil {
+                       r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
+                       return
+               }
+               r.kernelPageSize = size * 1024
+       } else if r.kernelPageSize < 0 {
+               // already failed to determine page size, don't keep
+               // trying/logging
+               return
+       }
+
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       procnames := make([]string, 0, len(r.reportPIDs))
+       for name := range r.reportPIDs {
+               procnames = append(procnames, name)
+       }
+       sort.Strings(procnames)
+       procmem := ""
+       for _, procname := range procnames {
+               pid := r.reportPIDs[procname]
+               buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+               if err != nil {
+                       continue
+               }
+               // If the executable name contains a ')' char,
+               // /proc/$pid/stat will look like '1234 (exec name)) S
+               // 123 ...' -- the last ')' is the end of the 2nd
+               // field.
+               paren := bytes.LastIndexByte(buf, ')')
+               if paren < 0 {
+                       continue
+               }
+               fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+               if len(fields) < 24 {
+                       continue
+               }
+               // rss is the 24th field in .../stat, and fields[0]
+               // here is the last char ')' of the 2nd field, so
+               // rss is fields[22]
+               rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
+               if err != nil {
+                       continue
+               }
+               value := rss * r.kernelPageSize
+               procmem += fmt.Sprintf(" %d %s", value, procname)
+               maxKey := memoryKey{pid, procname, "rss"}
+               if value > r.maxMemSample[maxKey] {
+                       r.maxMemSample[maxKey] = value
+               }
+       }
+       if procmem != "" {
+               r.Logger.Printf("procmem%s\n", procmem)
+       }
+}
+
 func (r *Reporter) doNetworkStats() {
        sampleTime := time.Now()
        stats, err := r.getContainerNetStats()
@@ -324,6 +550,9 @@ func (r *Reporter) doDiskSpaceStats() {
                used:       (s.Blocks - s.Bfree) * bs,
                available:  s.Bavail * bs,
        }
+       if nextSample.used > r.maxDiskSpaceSample.used {
+               r.maxDiskSpaceSample = nextSample
+       }
 
        var delta string
        if r.lastDiskSpaceSample.hasData {
@@ -406,11 +635,21 @@ func (r *Reporter) doCPUStats() {
        r.lastCPUSample = nextSample
 }
 
+func (r *Reporter) doAllStats() {
+       r.reportMemSample()
+       r.doProcmemStats()
+       r.doCPUStats()
+       r.doBlkIOStats()
+       r.doNetworkStats()
+       r.doDiskSpaceStats()
+}
+
 // Report stats periodically until we learn (via r.done) that someone
 // called Stop.
 func (r *Reporter) run() {
        defer close(r.flushed)
 
+       r.maxMemSample = make(map[memoryKey]int64)
        r.reportedStatFile = make(map[string]string)
 
        if !r.waitForCIDFile() || !r.waitForCgroup() {
@@ -428,17 +667,19 @@ func (r *Reporter) run() {
                r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
        }
 
-       ticker := time.NewTicker(r.PollPeriod)
+       r.getMemSample()
+       r.doAllStats()
+
+       memTicker := time.NewTicker(time.Second)
+       mainTicker := time.NewTicker(r.PollPeriod)
        for {
-               r.doMemoryStats()
-               r.doCPUStats()
-               r.doBlkIOStats()
-               r.doNetworkStats()
-               r.doDiskSpaceStats()
                select {
                case <-r.done:
                        return
-               case <-ticker.C:
+               case <-memTicker.C:
+                       r.getMemSample()
+               case <-mainTicker.C:
+                       r.doAllStats()
                }
        }
 }