1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
26 // A Reporter gathers statistics for a cgroup and writes them to a
28 type Reporter struct {
29 // CID of the container to monitor. If empty, read the CID
30 // from CIDFile (first waiting until a non-empty file appears
31 // at CIDFile). If CIDFile is also empty, report host
35 // Path to a file we can read CID from.
38 // Where cgroup accounting files live on this system, e.g.,
42 // Parent cgroup, e.g., "docker".
45 // Interval between samples. Must be positive.
46 PollPeriod time.Duration
48 // Temporary directory, will be monitored for available, used & total space.
51 // Where to write statistics. Must not be nil.
53 Printf(fmt string, args ...interface{})
57 reportedStatFile map[string]string
58 lastNetSample map[string]ioSample
59 lastDiskIOSample map[string]ioSample
60 lastCPUSample cpuSample
61 lastDiskSpaceSample diskSpaceSample
63 reportPIDs map[string]int
64 reportPIDsMu sync.Mutex
66 done chan struct{} // closed when we should stop reporting
67 flushed chan struct{} // closed when we have made our last report
70 // Start starts monitoring in a new goroutine, and returns
73 // The monitoring goroutine waits for a non-empty CIDFile to appear
74 // (unless CID is non-empty). Then it waits for the accounting files
75 // to appear for the monitored container. Then it collects and reports
76 // statistics until Stop is called.
78 // Callers should not call Start more than once.
80 // Callers should not modify public data fields after calling Start.
81 func (r *Reporter) Start() {
82 r.done = make(chan struct{})
83 r.flushed = make(chan struct{})
87 // ReportPID starts reporting stats for a specified process.
88 func (r *Reporter) ReportPID(name string, pid int) {
90 defer r.reportPIDsMu.Unlock()
91 if r.reportPIDs == nil {
92 r.reportPIDs = map[string]int{name: pid}
94 r.reportPIDs[name] = pid
98 // Stop reporting. Do not call more than once, or before calling
101 // Nothing will be logged after Stop returns.
102 func (r *Reporter) Stop() {
107 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
108 content, err := ioutil.ReadAll(in)
110 r.Logger.Printf("warning: %v", err)
115 // Open the cgroup stats file in /sys/fs corresponding to the target
116 // cgroup, and return an io.ReadCloser. If no stats file is available,
119 // Log the file that was opened, if it isn't the same file opened on
120 // the last openStatFile for this stat.
122 // Log "not available" if no file is found and either this stat has
123 // been available in the past, or verbose==true.
125 // TODO: Instead of trying all options, choose a process in the
126 // container, and read /proc/PID/cgroup to determine the appropriate
127 // cgroup root for the given statgroup. (This will avoid falling back
128 // to host-level stats during container setup and teardown.)
129 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
132 // Collect container's stats
134 fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
135 fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
138 // Collect this host's stats
140 fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
141 fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
147 for _, path = range paths {
148 file, err = os.Open(path)
155 if pathWas := r.reportedStatFile[stat]; pathWas != path {
156 // Log whenever we start using a new/different cgroup
157 // stat file for a given statistic. This typically
158 // happens 1 to 3 times per statistic, depending on
159 // whether we happen to collect stats [a] before any
160 // processes have been created in the container and
161 // [b] after all contained processes have exited.
162 if path == "" && verbose {
163 r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
164 } else if pathWas != "" {
165 r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
167 r.Logger.Printf("notice: reading stats from %s\n", path)
169 r.reportedStatFile[stat] = path
174 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
175 procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
179 defer procsFile.Close()
180 reader := bufio.NewScanner(procsFile)
182 taskPid := reader.Text()
183 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
184 stats, err := ioutil.ReadFile(statsFilename)
186 r.Logger.Printf("notice: %v", err)
189 return strings.NewReader(string(stats)), nil
191 return nil, errors.New("Could not read stats for any proc in container")
194 type ioSample struct {
200 func (r *Reporter) doBlkIOStats() {
201 c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
206 b := bufio.NewScanner(c)
207 var sampleTime = time.Now()
208 newSamples := make(map[string]ioSample)
210 var device, op string
212 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
215 var thisSample ioSample
217 if thisSample, ok = newSamples[device]; !ok {
218 thisSample = ioSample{sampleTime, -1, -1}
222 thisSample.rxBytes = val
224 thisSample.txBytes = val
226 newSamples[device] = thisSample
228 for dev, sample := range newSamples {
229 if sample.txBytes < 0 || sample.rxBytes < 0 {
233 if prev, ok := r.lastDiskIOSample[dev]; ok {
234 delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
235 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
236 sample.txBytes-prev.txBytes,
237 sample.rxBytes-prev.rxBytes)
239 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
240 r.lastDiskIOSample[dev] = sample
244 type memSample struct {
246 memStat map[string]int64
249 func (r *Reporter) doMemoryStats() {
250 c, err := r.openStatFile("memory", "memory.stat", true)
255 b := bufio.NewScanner(c)
256 thisSample := memSample{time.Now(), make(map[string]int64)}
257 wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
261 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
264 thisSample.memStat[stat] = val
266 var outstat bytes.Buffer
267 for _, key := range wantStats {
268 // Use "total_X" stats (entire hierarchy) if enabled,
269 // otherwise just the single cgroup -- see
270 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
271 if val, ok := thisSample.memStat["total_"+key]; ok {
272 fmt.Fprintf(&outstat, " %d %s", val, key)
273 } else if val, ok := thisSample.memStat[key]; ok {
274 fmt.Fprintf(&outstat, " %d %s", val, key)
277 r.Logger.Printf("mem%s\n", outstat.String())
279 if r.kernelPageSize == 0 {
280 // assign "don't try again" value in case we give up
281 // and return without assigning the real value
282 r.kernelPageSize = -1
283 buf, err := os.ReadFile("/proc/self/smaps")
285 r.Logger.Printf("error reading /proc/self/smaps: %s", err)
288 m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
290 r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
293 size, err := strconv.ParseInt(string(m[1]), 10, 64)
295 r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
298 r.kernelPageSize = size * 1024
299 } else if r.kernelPageSize < 0 {
300 // already failed to determine page size, don't keep
305 r.reportPIDsMu.Lock()
306 defer r.reportPIDsMu.Unlock()
307 procnames := make([]string, 0, len(r.reportPIDs))
308 for name := range r.reportPIDs {
309 procnames = append(procnames, name)
311 sort.Strings(procnames)
313 for _, procname := range procnames {
314 pid := r.reportPIDs[procname]
315 buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
319 // If the executable name contains a ')' char,
320 // /proc/$pid/stat will look like '1234 (exec name)) S
321 // 123 ...' -- the last ')' is the end of the 2nd
323 paren := bytes.LastIndexByte(buf, ')')
327 fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
328 if len(fields) < 24 {
331 // rss is the 24th field in .../stat, and fields[0]
332 // here is the last char ')' of the 2nd field, so
334 rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
338 procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
341 r.Logger.Printf("procmem%s\n", procmem)
345 func (r *Reporter) doNetworkStats() {
346 sampleTime := time.Now()
347 stats, err := r.getContainerNetStats()
352 scanner := bufio.NewScanner(stats)
356 words := strings.Fields(scanner.Text())
357 if len(words) != 17 {
358 // Skip lines with wrong format
361 ifName = strings.TrimRight(words[0], ":")
362 if ifName == "lo" || ifName == "" {
363 // Skip loopback interface and lines with wrong format
366 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
369 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
372 nextSample := ioSample{}
373 nextSample.sampleTime = sampleTime
374 nextSample.txBytes = tx
375 nextSample.rxBytes = rx
377 if prev, ok := r.lastNetSample[ifName]; ok {
378 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
379 delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
384 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
385 r.lastNetSample[ifName] = nextSample
389 type diskSpaceSample struct {
397 func (r *Reporter) doDiskSpaceStats() {
398 s := syscall.Statfs_t{}
399 err := syscall.Statfs(r.TempDir, &s)
403 bs := uint64(s.Bsize)
404 nextSample := diskSpaceSample{
406 sampleTime: time.Now(),
407 total: s.Blocks * bs,
408 used: (s.Blocks - s.Bfree) * bs,
409 available: s.Bavail * bs,
413 if r.lastDiskSpaceSample.hasData {
414 prev := r.lastDiskSpaceSample
415 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
416 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
418 int64(nextSample.used-prev.used))
420 r.Logger.Printf("statfs %d available %d used %d total%s\n",
421 nextSample.available, nextSample.used, nextSample.total, delta)
422 r.lastDiskSpaceSample = nextSample
425 type cpuSample struct {
426 hasData bool // to distinguish the zero value from real data
433 // Return the number of CPUs available in the container. Return 0 if
434 // we can't figure out the real number of CPUs.
435 func (r *Reporter) getCPUCount() int64 {
436 cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
440 defer cpusetFile.Close()
441 b, err := r.readAllOrWarn(cpusetFile)
445 sp := strings.Split(string(b), ",")
447 for _, v := range sp {
449 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
451 cpus += (max - min) + 1
459 func (r *Reporter) doCPUStats() {
460 statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
464 defer statFile.Close()
465 b, err := r.readAllOrWarn(statFile)
470 var userTicks, sysTicks int64
471 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
472 userHz := float64(100)
473 nextSample := cpuSample{
475 sampleTime: time.Now(),
476 user: float64(userTicks) / userHz,
477 sys: float64(sysTicks) / userHz,
478 cpus: r.getCPUCount(),
482 if r.lastCPUSample.hasData {
483 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
484 nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
485 nextSample.user-r.lastCPUSample.user,
486 nextSample.sys-r.lastCPUSample.sys)
488 r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
489 nextSample.user, nextSample.sys, nextSample.cpus, delta)
490 r.lastCPUSample = nextSample
493 // Report stats periodically until we learn (via r.done) that someone
495 func (r *Reporter) run() {
496 defer close(r.flushed)
498 r.reportedStatFile = make(map[string]string)
500 if !r.waitForCIDFile() || !r.waitForCgroup() {
504 r.lastNetSample = make(map[string]ioSample)
505 r.lastDiskIOSample = make(map[string]ioSample)
507 if len(r.TempDir) == 0 {
508 // Temporary dir not provided, try to get it from the environment.
509 r.TempDir = os.Getenv("TMPDIR")
511 if len(r.TempDir) > 0 {
512 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
515 ticker := time.NewTicker(r.PollPeriod)
530 // If CID is empty, wait for it to appear in CIDFile. Return true if
531 // we get it before we learn (via r.done) that someone called Stop.
532 func (r *Reporter) waitForCIDFile() bool {
533 if r.CID != "" || r.CIDFile == "" {
537 ticker := time.NewTicker(100 * time.Millisecond)
540 cid, err := ioutil.ReadFile(r.CIDFile)
541 if err == nil && len(cid) > 0 {
548 r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
554 // Wait for the cgroup stats files to appear in cgroup_root. Return
555 // true if they appear before r.done indicates someone called Stop. If
556 // they don't appear within one poll interval, log a warning and keep
558 func (r *Reporter) waitForCgroup() bool {
559 ticker := time.NewTicker(100 * time.Millisecond)
561 warningTimer := time.After(r.PollPeriod)
563 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
571 r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
573 r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)