X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44c95f99098fa6c6acbfa82d4b6cbc6015eb6e39..db118cca358662e57a3dd0c1186dce1f0a62ca52:/lib/crunchstat/crunchstat.go diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index f4915c0e3e..ad1cc7a97a 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -13,22 +13,22 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" + "regexp" + "sort" "strconv" "strings" + "sync" + "syscall" "time" ) -// This magically allows us to look up userHz via _SC_CLK_TCK: +// crunchstat collects all memory statistics, but only reports these. +var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"} -/* -#include -#include -#include -#include -*/ -import "C" +type logPrinter interface { + Printf(fmt string, args ...interface{}) +} // A Reporter gathers statistics for a cgroup and writes them to a // log.Logger. @@ -52,18 +52,67 @@ type Reporter struct { // Interval between samples. Must be positive. PollPeriod time.Duration + // Temporary directory, will be monitored for available, used & total space. + TempDir string + // Where to write statistics. Must not be nil. - Logger *log.Logger + Logger logPrinter + + // When stats cross thresholds configured in the fields below, + // they are reported to this logger. + ThresholdLogger logPrinter - reportedStatFile map[string]string - lastNetSample map[string]ioSample - lastDiskSample map[string]ioSample - lastCPUSample cpuSample + // MemThresholds maps memory stat names to slices of thresholds. + // When the corresponding stat exceeds a threshold, that will be logged. + MemThresholds map[string][]Threshold + + kernelPageSize int64 + reportedStatFile map[string]string + lastNetSample map[string]ioSample + lastDiskIOSample map[string]ioSample + lastCPUSample cpuSample + lastDiskSpaceSample diskSpaceSample + lastMemSample memSample + maxDiskSpaceSample diskSpaceSample + maxMemSample map[memoryKey]int64 + + reportPIDs map[string]int + reportPIDsMu sync.Mutex done chan struct{} // closed when we should stop reporting flushed chan struct{} // closed when we have made our last report } +type Threshold struct { + percentage int64 + threshold int64 + total int64 +} + +func NewThresholdFromPercentage(total int64, percentage int64) Threshold { + return Threshold{ + percentage: percentage, + threshold: total * percentage / 100, + total: total, + } +} + +func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) { + for _, percentage := range percentages { + thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage)) + } + return +} + +// memoryKey is a key into Reporter.maxMemSample. +// Initialize it with just statName to get the host/cgroup maximum. +// Initialize it with all fields to get that process' maximum. +type memoryKey struct { + processID int + processName string + statName string +} + // Start starts monitoring in a new goroutine, and returns // immediately. // @@ -81,19 +130,86 @@ func (r *Reporter) Start() { go r.run() } +// ReportPID starts reporting stats for a specified process. +func (r *Reporter) ReportPID(name string, pid int) { + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + if r.reportPIDs == nil { + r.reportPIDs = map[string]int{name: pid} + } else { + r.reportPIDs[name] = pid + } +} + // Stop reporting. Do not call more than once, or before calling // Start. // -// Nothing will be logged after Stop returns. +// Nothing will be logged after Stop returns unless you call a Log* method. func (r *Reporter) Stop() { close(r.done) <-r.flushed } +func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) { + var units string + switch statName { + case "pgmajfault": + units = "faults" + default: + units = "bytes" + } + if limit > 0 { + percentage := 100 * value / limit + logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s", + source, statName, percentage, value, limit, units) + } else { + logger.Printf("Maximum %s memory %s usage was %d %s", + source, statName, value, units) + } +} + +func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) { + if r.lastCPUSample.hasData { + logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs", + r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus) + } + for disk, sample := range r.lastDiskIOSample { + logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read", + disk, sample.txBytes, sample.rxBytes) + } + if r.maxDiskSpaceSample.total > 0 { + percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total + logger.Printf("Maximum disk usage was %d%%, %d/%d bytes", + percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total) + } + for _, statName := range memoryStats { + value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}] + if !ok { + value, ok = r.maxMemSample[memoryKey{statName: statName}] + } + if ok { + r.reportMemoryMax(logger, "container", statName, value, memLimits[statName]) + } + } + for ifname, sample := range r.lastNetSample { + logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read", + ifname, sample.txBytes, sample.rxBytes) + } +} + +func (r *Reporter) LogProcessMemMax(logger logPrinter) { + for memKey, value := range r.maxMemSample { + if memKey.processName == "" { + continue + } + r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0) + } +} + func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) { content, err := ioutil.ReadAll(in) if err != nil { - r.Logger.Print(err) + r.Logger.Printf("warning: %v", err) } return content, err } @@ -169,7 +285,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) { statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid) stats, err := ioutil.ReadFile(statsFilename) if err != nil { - r.Logger.Print(err) + r.Logger.Printf("notice: %v", err) continue } return strings.NewReader(string(stats)), nil @@ -216,14 +332,14 @@ func (r *Reporter) doBlkIOStats() { continue } delta := "" - if prev, ok := r.lastDiskSample[dev]; ok { + if prev, ok := r.lastDiskIOSample[dev]; ok { delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read", sample.sampleTime.Sub(prev.sampleTime).Seconds(), sample.txBytes-prev.txBytes, sample.rxBytes-prev.rxBytes) } r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta) - r.lastDiskSample[dev] = sample + r.lastDiskIOSample[dev] = sample } } @@ -232,7 +348,7 @@ type memSample struct { memStat map[string]int64 } -func (r *Reporter) doMemoryStats() { +func (r *Reporter) getMemSample() { c, err := r.openStatFile("memory", "memory.stat", true) if err != nil { return @@ -240,7 +356,6 @@ func (r *Reporter) doMemoryStats() { defer c.Close() b := bufio.NewScanner(c) thisSample := memSample{time.Now(), make(map[string]int64)} - wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"} for b.Scan() { var stat string var val int64 @@ -248,16 +363,127 @@ func (r *Reporter) doMemoryStats() { continue } thisSample.memStat[stat] = val + maxKey := memoryKey{statName: stat} + if val > r.maxMemSample[maxKey] { + r.maxMemSample[maxKey] = val + } } + r.lastMemSample = thisSample + + if r.ThresholdLogger != nil { + for statName, thresholds := range r.MemThresholds { + statValue, ok := thisSample.memStat["total_"+statName] + if !ok { + statValue, ok = thisSample.memStat[statName] + if !ok { + continue + } + } + var index int + var statThreshold Threshold + for index, statThreshold = range thresholds { + if statValue < statThreshold.threshold { + break + } else if statThreshold.percentage > 0 { + r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)", + statThreshold.percentage, statName, statValue, statThreshold.total) + } else { + r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)", + statThreshold.threshold, statName, statValue) + } + } + r.MemThresholds[statName] = thresholds[index:] + } + } +} + +func (r *Reporter) reportMemSample() { var outstat bytes.Buffer - for _, key := range wantStats { - if val, ok := thisSample.memStat[key]; ok { - outstat.WriteString(fmt.Sprintf(" %d %s", val, key)) + for _, key := range memoryStats { + // Use "total_X" stats (entire hierarchy) if enabled, + // otherwise just the single cgroup -- see + // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + if val, ok := r.lastMemSample.memStat["total_"+key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) + } else if val, ok := r.lastMemSample.memStat[key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) } } r.Logger.Printf("mem%s\n", outstat.String()) } +func (r *Reporter) doProcmemStats() { + if r.kernelPageSize == 0 { + // assign "don't try again" value in case we give up + // and return without assigning the real value + r.kernelPageSize = -1 + buf, err := os.ReadFile("/proc/self/smaps") + if err != nil { + r.Logger.Printf("error reading /proc/self/smaps: %s", err) + return + } + m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf) + if len(m) != 2 { + r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found") + return + } + size, err := strconv.ParseInt(string(m[1]), 10, 64) + if err != nil { + r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err) + return + } + r.kernelPageSize = size * 1024 + } else if r.kernelPageSize < 0 { + // already failed to determine page size, don't keep + // trying/logging + return + } + + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + procnames := make([]string, 0, len(r.reportPIDs)) + for name := range r.reportPIDs { + procnames = append(procnames, name) + } + sort.Strings(procnames) + procmem := "" + for _, procname := range procnames { + pid := r.reportPIDs[procname] + buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)) + if err != nil { + continue + } + // If the executable name contains a ')' char, + // /proc/$pid/stat will look like '1234 (exec name)) S + // 123 ...' -- the last ')' is the end of the 2nd + // field. + paren := bytes.LastIndexByte(buf, ')') + if paren < 0 { + continue + } + fields := bytes.SplitN(buf[paren:], []byte{' '}, 24) + if len(fields) < 24 { + continue + } + // rss is the 24th field in .../stat, and fields[0] + // here is the last char ')' of the 2nd field, so + // rss is fields[22] + rss, err := strconv.ParseInt(string(fields[22]), 10, 64) + if err != nil { + continue + } + value := rss * r.kernelPageSize + procmem += fmt.Sprintf(" %d %s", value, procname) + maxKey := memoryKey{pid, procname, "rss"} + if value > r.maxMemSample[maxKey] { + r.maxMemSample[maxKey] = value + } + } + if procmem != "" { + r.Logger.Printf("procmem%s\n", procmem) + } +} + func (r *Reporter) doNetworkStats() { sampleTime := time.Now() stats, err := r.getContainerNetStats() @@ -302,6 +528,45 @@ func (r *Reporter) doNetworkStats() { } } +type diskSpaceSample struct { + hasData bool + sampleTime time.Time + total uint64 + used uint64 + available uint64 +} + +func (r *Reporter) doDiskSpaceStats() { + s := syscall.Statfs_t{} + err := syscall.Statfs(r.TempDir, &s) + if err != nil { + return + } + bs := uint64(s.Bsize) + nextSample := diskSpaceSample{ + hasData: true, + sampleTime: time.Now(), + total: s.Blocks * bs, + used: (s.Blocks - s.Bfree) * bs, + available: s.Bavail * bs, + } + if nextSample.used > r.maxDiskSpaceSample.used { + r.maxDiskSpaceSample = nextSample + } + + var delta string + if r.lastDiskSpaceSample.hasData { + prev := r.lastDiskSpaceSample + interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds() + delta = fmt.Sprintf(" -- interval %.4f seconds %d used", + interval, + int64(nextSample.used-prev.used)) + } + r.Logger.Printf("statfs %d available %d used %d total%s\n", + nextSample.available, nextSample.used, nextSample.total, delta) + r.lastDiskSpaceSample = nextSample +} + type cpuSample struct { hasData bool // to distinguish the zero value from real data sampleTime time.Time @@ -349,7 +614,7 @@ func (r *Reporter) doCPUStats() { var userTicks, sysTicks int64 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks) - userHz := float64(C.sysconf(C._SC_CLK_TCK)) + userHz := float64(100) nextSample := cpuSample{ hasData: true, sampleTime: time.Now(), @@ -370,11 +635,21 @@ func (r *Reporter) doCPUStats() { r.lastCPUSample = nextSample } +func (r *Reporter) doAllStats() { + r.reportMemSample() + r.doProcmemStats() + r.doCPUStats() + r.doBlkIOStats() + r.doNetworkStats() + r.doDiskSpaceStats() +} + // Report stats periodically until we learn (via r.done) that someone // called Stop. func (r *Reporter) run() { defer close(r.flushed) + r.maxMemSample = make(map[memoryKey]int64) r.reportedStatFile = make(map[string]string) if !r.waitForCIDFile() || !r.waitForCgroup() { @@ -382,18 +657,29 @@ func (r *Reporter) run() { } r.lastNetSample = make(map[string]ioSample) - r.lastDiskSample = make(map[string]ioSample) + r.lastDiskIOSample = make(map[string]ioSample) - ticker := time.NewTicker(r.PollPeriod) + if len(r.TempDir) == 0 { + // Temporary dir not provided, try to get it from the environment. + r.TempDir = os.Getenv("TMPDIR") + } + if len(r.TempDir) > 0 { + r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir) + } + + r.getMemSample() + r.doAllStats() + + memTicker := time.NewTicker(time.Second) + mainTicker := time.NewTicker(r.PollPeriod) for { - r.doMemoryStats() - r.doCPUStats() - r.doBlkIOStats() - r.doNetworkStats() select { case <-r.done: return - case <-ticker.C: + case <-memTicker.C: + r.getMemSample() + case <-mainTicker.C: + r.doAllStats() } } } @@ -416,7 +702,7 @@ func (r *Reporter) waitForCIDFile() bool { select { case <-ticker.C: case <-r.done: - r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err) + r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err) return false } } @@ -439,9 +725,9 @@ func (r *Reporter) waitForCgroup() bool { select { case <-ticker.C: case <-warningTimer: - r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod) + r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod) case <-r.done: - r.Logger.Printf("cgroup stats files never appeared for %v", r.CID) + r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID) return false } }