1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
26 // crunchstat collects all memory statistics, but only reports these.
27 var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"}
29 type logPrinter interface {
30 Printf(fmt string, args ...interface{})
33 // A Reporter gathers statistics for a cgroup and writes them to a
35 type Reporter struct {
36 // CID of the container to monitor. If empty, read the CID
37 // from CIDFile (first waiting until a non-empty file appears
38 // at CIDFile). If CIDFile is also empty, report host
42 // Path to a file we can read CID from.
45 // Where cgroup accounting files live on this system, e.g.,
49 // Parent cgroup, e.g., "docker".
52 // Interval between samples. Must be positive.
53 PollPeriod time.Duration
55 // Temporary directory, will be monitored for available, used & total space.
58 // Where to write statistics. Must not be nil.
61 // When stats cross thresholds configured in the fields below,
62 // they are reported to this logger.
63 ThresholdLogger logPrinter
65 // MemThresholds maps memory stat names to slices of thresholds.
66 // When the corresponding stat exceeds a threshold, that will be logged.
67 MemThresholds map[string][]Threshold
70 reportedStatFile map[string]string
71 lastNetSample map[string]ioSample
72 lastDiskIOSample map[string]ioSample
73 lastCPUSample cpuSample
74 lastDiskSpaceSample diskSpaceSample
75 lastMemSample memSample
76 maxDiskSpaceSample diskSpaceSample
77 maxMemSample map[memoryKey]int64
79 reportPIDs map[string]int
80 reportPIDsMu sync.Mutex
82 done chan struct{} // closed when we should stop reporting
83 flushed chan struct{} // closed when we have made our last report
86 type Threshold struct {
92 func NewThresholdFromPercentage(total int64, percentage int64) Threshold {
94 percentage: percentage,
95 threshold: total * percentage / 100,
100 func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) {
101 for _, percentage := range percentages {
102 thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage))
107 // memoryKey is a key into Reporter.maxMemSample.
108 // Initialize it with just statName to get the host/cgroup maximum.
109 // Initialize it with all fields to get that process' maximum.
110 type memoryKey struct {
116 // Start starts monitoring in a new goroutine, and returns
119 // The monitoring goroutine waits for a non-empty CIDFile to appear
120 // (unless CID is non-empty). Then it waits for the accounting files
121 // to appear for the monitored container. Then it collects and reports
122 // statistics until Stop is called.
124 // Callers should not call Start more than once.
126 // Callers should not modify public data fields after calling Start.
127 func (r *Reporter) Start() {
128 r.done = make(chan struct{})
129 r.flushed = make(chan struct{})
133 // ReportPID starts reporting stats for a specified process.
134 func (r *Reporter) ReportPID(name string, pid int) {
135 r.reportPIDsMu.Lock()
136 defer r.reportPIDsMu.Unlock()
137 if r.reportPIDs == nil {
138 r.reportPIDs = map[string]int{name: pid}
140 r.reportPIDs[name] = pid
144 // Stop reporting. Do not call more than once, or before calling
147 // Nothing will be logged after Stop returns unless you call a Log* method.
148 func (r *Reporter) Stop() {
153 func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) {
162 percentage := 100 * value / limit
163 logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s",
164 source, statName, percentage, value, limit, units)
166 logger.Printf("Maximum %s memory %s usage was %d %s",
167 source, statName, value, units)
171 func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) {
172 if r.lastCPUSample.hasData {
173 logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs",
174 r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus)
176 for disk, sample := range r.lastDiskIOSample {
177 logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read",
178 disk, sample.txBytes, sample.rxBytes)
180 if r.maxDiskSpaceSample.total > 0 {
181 percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total
182 logger.Printf("Maximum disk usage was %d%%, %d/%d bytes",
183 percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total)
185 for _, statName := range memoryStats {
186 value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}]
188 value, ok = r.maxMemSample[memoryKey{statName: statName}]
191 r.reportMemoryMax(logger, "container", statName, value, memLimits[statName])
194 for ifname, sample := range r.lastNetSample {
195 logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read",
196 ifname, sample.txBytes, sample.rxBytes)
200 func (r *Reporter) LogProcessMemMax(logger logPrinter) {
201 for memKey, value := range r.maxMemSample {
202 if memKey.processName == "" {
205 r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0)
209 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
210 content, err := ioutil.ReadAll(in)
212 r.Logger.Printf("warning: %v", err)
217 // Open the cgroup stats file in /sys/fs corresponding to the target
218 // cgroup, and return an io.ReadCloser. If no stats file is available,
221 // Log the file that was opened, if it isn't the same file opened on
222 // the last openStatFile for this stat.
224 // Log "not available" if no file is found and either this stat has
225 // been available in the past, or verbose==true.
227 // TODO: Instead of trying all options, choose a process in the
228 // container, and read /proc/PID/cgroup to determine the appropriate
229 // cgroup root for the given statgroup. (This will avoid falling back
230 // to host-level stats during container setup and teardown.)
231 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
234 // Collect container's stats
236 fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
237 fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
240 // Collect this host's stats
242 fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
243 fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
249 for _, path = range paths {
250 file, err = os.Open(path)
257 if pathWas := r.reportedStatFile[stat]; pathWas != path {
258 // Log whenever we start using a new/different cgroup
259 // stat file for a given statistic. This typically
260 // happens 1 to 3 times per statistic, depending on
261 // whether we happen to collect stats [a] before any
262 // processes have been created in the container and
263 // [b] after all contained processes have exited.
264 if path == "" && verbose {
265 r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
266 } else if pathWas != "" {
267 r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
269 r.Logger.Printf("notice: reading stats from %s\n", path)
271 r.reportedStatFile[stat] = path
276 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
277 procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
281 defer procsFile.Close()
282 reader := bufio.NewScanner(procsFile)
284 taskPid := reader.Text()
285 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
286 stats, err := ioutil.ReadFile(statsFilename)
288 r.Logger.Printf("notice: %v", err)
291 return strings.NewReader(string(stats)), nil
293 return nil, errors.New("Could not read stats for any proc in container")
296 type ioSample struct {
302 func (r *Reporter) doBlkIOStats() {
303 c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
308 b := bufio.NewScanner(c)
309 var sampleTime = time.Now()
310 newSamples := make(map[string]ioSample)
312 var device, op string
314 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
317 var thisSample ioSample
319 if thisSample, ok = newSamples[device]; !ok {
320 thisSample = ioSample{sampleTime, -1, -1}
324 thisSample.rxBytes = val
326 thisSample.txBytes = val
328 newSamples[device] = thisSample
330 for dev, sample := range newSamples {
331 if sample.txBytes < 0 || sample.rxBytes < 0 {
335 if prev, ok := r.lastDiskIOSample[dev]; ok {
336 delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
337 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
338 sample.txBytes-prev.txBytes,
339 sample.rxBytes-prev.rxBytes)
341 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
342 r.lastDiskIOSample[dev] = sample
346 type memSample struct {
348 memStat map[string]int64
351 func (r *Reporter) getMemSample() {
352 c, err := r.openStatFile("memory", "memory.stat", true)
357 b := bufio.NewScanner(c)
358 thisSample := memSample{time.Now(), make(map[string]int64)}
362 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
365 thisSample.memStat[stat] = val
366 maxKey := memoryKey{statName: stat}
367 if val > r.maxMemSample[maxKey] {
368 r.maxMemSample[maxKey] = val
371 r.lastMemSample = thisSample
373 if r.ThresholdLogger != nil {
374 for statName, thresholds := range r.MemThresholds {
375 statValue, ok := thisSample.memStat["total_"+statName]
377 statValue, ok = thisSample.memStat[statName]
383 var statThreshold Threshold
384 for index, statThreshold = range thresholds {
385 if statValue < statThreshold.threshold {
387 } else if statThreshold.percentage > 0 {
388 r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)",
389 statThreshold.percentage, statName, statValue, statThreshold.total)
391 r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)",
392 statThreshold.threshold, statName, statValue)
395 r.MemThresholds[statName] = thresholds[index:]
400 func (r *Reporter) reportMemSample() {
401 var outstat bytes.Buffer
402 for _, key := range memoryStats {
403 // Use "total_X" stats (entire hierarchy) if enabled,
404 // otherwise just the single cgroup -- see
405 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
406 if val, ok := r.lastMemSample.memStat["total_"+key]; ok {
407 fmt.Fprintf(&outstat, " %d %s", val, key)
408 } else if val, ok := r.lastMemSample.memStat[key]; ok {
409 fmt.Fprintf(&outstat, " %d %s", val, key)
412 r.Logger.Printf("mem%s\n", outstat.String())
415 func (r *Reporter) doProcmemStats() {
416 if r.kernelPageSize == 0 {
417 // assign "don't try again" value in case we give up
418 // and return without assigning the real value
419 r.kernelPageSize = -1
420 buf, err := os.ReadFile("/proc/self/smaps")
422 r.Logger.Printf("error reading /proc/self/smaps: %s", err)
425 m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
427 r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
430 size, err := strconv.ParseInt(string(m[1]), 10, 64)
432 r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
435 r.kernelPageSize = size * 1024
436 } else if r.kernelPageSize < 0 {
437 // already failed to determine page size, don't keep
442 r.reportPIDsMu.Lock()
443 defer r.reportPIDsMu.Unlock()
444 procnames := make([]string, 0, len(r.reportPIDs))
445 for name := range r.reportPIDs {
446 procnames = append(procnames, name)
448 sort.Strings(procnames)
450 for _, procname := range procnames {
451 pid := r.reportPIDs[procname]
452 buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
456 // If the executable name contains a ')' char,
457 // /proc/$pid/stat will look like '1234 (exec name)) S
458 // 123 ...' -- the last ')' is the end of the 2nd
460 paren := bytes.LastIndexByte(buf, ')')
464 fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
465 if len(fields) < 24 {
468 // rss is the 24th field in .../stat, and fields[0]
469 // here is the last char ')' of the 2nd field, so
471 rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
475 value := rss * r.kernelPageSize
476 procmem += fmt.Sprintf(" %d %s", value, procname)
477 maxKey := memoryKey{pid, procname, "rss"}
478 if value > r.maxMemSample[maxKey] {
479 r.maxMemSample[maxKey] = value
483 r.Logger.Printf("procmem%s\n", procmem)
487 func (r *Reporter) doNetworkStats() {
488 sampleTime := time.Now()
489 stats, err := r.getContainerNetStats()
494 scanner := bufio.NewScanner(stats)
498 words := strings.Fields(scanner.Text())
499 if len(words) != 17 {
500 // Skip lines with wrong format
503 ifName = strings.TrimRight(words[0], ":")
504 if ifName == "lo" || ifName == "" {
505 // Skip loopback interface and lines with wrong format
508 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
511 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
514 nextSample := ioSample{}
515 nextSample.sampleTime = sampleTime
516 nextSample.txBytes = tx
517 nextSample.rxBytes = rx
519 if prev, ok := r.lastNetSample[ifName]; ok {
520 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
521 delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
526 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
527 r.lastNetSample[ifName] = nextSample
531 type diskSpaceSample struct {
539 func (r *Reporter) doDiskSpaceStats() {
540 s := syscall.Statfs_t{}
541 err := syscall.Statfs(r.TempDir, &s)
545 bs := uint64(s.Bsize)
546 nextSample := diskSpaceSample{
548 sampleTime: time.Now(),
549 total: s.Blocks * bs,
550 used: (s.Blocks - s.Bfree) * bs,
551 available: s.Bavail * bs,
553 if nextSample.used > r.maxDiskSpaceSample.used {
554 r.maxDiskSpaceSample = nextSample
558 if r.lastDiskSpaceSample.hasData {
559 prev := r.lastDiskSpaceSample
560 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
561 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
563 int64(nextSample.used-prev.used))
565 r.Logger.Printf("statfs %d available %d used %d total%s\n",
566 nextSample.available, nextSample.used, nextSample.total, delta)
567 r.lastDiskSpaceSample = nextSample
570 type cpuSample struct {
571 hasData bool // to distinguish the zero value from real data
578 // Return the number of CPUs available in the container. Return 0 if
579 // we can't figure out the real number of CPUs.
580 func (r *Reporter) getCPUCount() int64 {
581 cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
585 defer cpusetFile.Close()
586 b, err := r.readAllOrWarn(cpusetFile)
590 sp := strings.Split(string(b), ",")
592 for _, v := range sp {
594 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
596 cpus += (max - min) + 1
604 func (r *Reporter) doCPUStats() {
605 statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
609 defer statFile.Close()
610 b, err := r.readAllOrWarn(statFile)
615 var userTicks, sysTicks int64
616 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
617 userHz := float64(100)
618 nextSample := cpuSample{
620 sampleTime: time.Now(),
621 user: float64(userTicks) / userHz,
622 sys: float64(sysTicks) / userHz,
623 cpus: r.getCPUCount(),
627 if r.lastCPUSample.hasData {
628 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
629 nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
630 nextSample.user-r.lastCPUSample.user,
631 nextSample.sys-r.lastCPUSample.sys)
633 r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
634 nextSample.user, nextSample.sys, nextSample.cpus, delta)
635 r.lastCPUSample = nextSample
638 func (r *Reporter) doAllStats() {
647 // Report stats periodically until we learn (via r.done) that someone
649 func (r *Reporter) run() {
650 defer close(r.flushed)
652 r.maxMemSample = make(map[memoryKey]int64)
653 r.reportedStatFile = make(map[string]string)
655 if !r.waitForCIDFile() || !r.waitForCgroup() {
659 r.lastNetSample = make(map[string]ioSample)
660 r.lastDiskIOSample = make(map[string]ioSample)
662 if len(r.TempDir) == 0 {
663 // Temporary dir not provided, try to get it from the environment.
664 r.TempDir = os.Getenv("TMPDIR")
666 if len(r.TempDir) > 0 {
667 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
673 memTicker := time.NewTicker(time.Second)
674 mainTicker := time.NewTicker(r.PollPeriod)
687 // If CID is empty, wait for it to appear in CIDFile. Return true if
688 // we get it before we learn (via r.done) that someone called Stop.
689 func (r *Reporter) waitForCIDFile() bool {
690 if r.CID != "" || r.CIDFile == "" {
694 ticker := time.NewTicker(100 * time.Millisecond)
697 cid, err := ioutil.ReadFile(r.CIDFile)
698 if err == nil && len(cid) > 0 {
705 r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
711 // Wait for the cgroup stats files to appear in cgroup_root. Return
712 // true if they appear before r.done indicates someone called Stop. If
713 // they don't appear within one poll interval, log a warning and keep
715 func (r *Reporter) waitForCgroup() bool {
716 ticker := time.NewTicker(100 * time.Millisecond)
718 warningTimer := time.After(r.PollPeriod)
720 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
728 r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
730 r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)