1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
24 // This magically allows us to look up userHz via _SC_CLK_TCK:
28 #include <sys/types.h>
34 // A Reporter gathers statistics for a cgroup and writes them to a
36 type Reporter struct {
37 // CID of the container to monitor. If empty, read the CID
38 // from CIDFile (first waiting until a non-empty file appears
39 // at CIDFile). If CIDFile is also empty, report host
43 // Path to a file we can read CID from.
46 // Where cgroup accounting files live on this system, e.g.,
50 // Parent cgroup, e.g., "docker".
53 // Interval between samples. Must be positive.
54 PollPeriod time.Duration
56 // Temporary directory, will be monitored for available, used & total space.
59 // Where to write statistics. Must not be nil.
62 reportedStatFile map[string]string
63 lastNetSample map[string]ioSample
64 lastDiskIOSample map[string]ioSample
65 lastCPUSample cpuSample
66 lastDiskSpaceSample diskSpaceSample
68 done chan struct{} // closed when we should stop reporting
69 flushed chan struct{} // closed when we have made our last report
72 // Start starts monitoring in a new goroutine, and returns
75 // The monitoring goroutine waits for a non-empty CIDFile to appear
76 // (unless CID is non-empty). Then it waits for the accounting files
77 // to appear for the monitored container. Then it collects and reports
78 // statistics until Stop is called.
80 // Callers should not call Start more than once.
82 // Callers should not modify public data fields after calling Start.
83 func (r *Reporter) Start() {
84 r.done = make(chan struct{})
85 r.flushed = make(chan struct{})
89 // Stop reporting. Do not call more than once, or before calling
92 // Nothing will be logged after Stop returns.
93 func (r *Reporter) Stop() {
98 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
99 content, err := ioutil.ReadAll(in)
101 r.Logger.Printf("warning: %v", err)
106 // Open the cgroup stats file in /sys/fs corresponding to the target
107 // cgroup, and return an io.ReadCloser. If no stats file is available,
110 // Log the file that was opened, if it isn't the same file opened on
111 // the last openStatFile for this stat.
113 // Log "not available" if no file is found and either this stat has
114 // been available in the past, or verbose==true.
116 // TODO: Instead of trying all options, choose a process in the
117 // container, and read /proc/PID/cgroup to determine the appropriate
118 // cgroup root for the given statgroup. (This will avoid falling back
119 // to host-level stats during container setup and teardown.)
120 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
123 // Collect container's stats
125 fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
126 fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
129 // Collect this host's stats
131 fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
132 fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
138 for _, path = range paths {
139 file, err = os.Open(path)
146 if pathWas := r.reportedStatFile[stat]; pathWas != path {
147 // Log whenever we start using a new/different cgroup
148 // stat file for a given statistic. This typically
149 // happens 1 to 3 times per statistic, depending on
150 // whether we happen to collect stats [a] before any
151 // processes have been created in the container and
152 // [b] after all contained processes have exited.
153 if path == "" && verbose {
154 r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
155 } else if pathWas != "" {
156 r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
158 r.Logger.Printf("notice: reading stats from %s\n", path)
160 r.reportedStatFile[stat] = path
165 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
166 procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
170 defer procsFile.Close()
171 reader := bufio.NewScanner(procsFile)
173 taskPid := reader.Text()
174 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
175 stats, err := ioutil.ReadFile(statsFilename)
177 r.Logger.Printf("notice: %v", err)
180 return strings.NewReader(string(stats)), nil
182 return nil, errors.New("Could not read stats for any proc in container")
185 type ioSample struct {
191 func (r *Reporter) doBlkIOStats() {
192 c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
197 b := bufio.NewScanner(c)
198 var sampleTime = time.Now()
199 newSamples := make(map[string]ioSample)
201 var device, op string
203 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
206 var thisSample ioSample
208 if thisSample, ok = newSamples[device]; !ok {
209 thisSample = ioSample{sampleTime, -1, -1}
213 thisSample.rxBytes = val
215 thisSample.txBytes = val
217 newSamples[device] = thisSample
219 for dev, sample := range newSamples {
220 if sample.txBytes < 0 || sample.rxBytes < 0 {
224 if prev, ok := r.lastDiskIOSample[dev]; ok {
225 delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
226 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
227 sample.txBytes-prev.txBytes,
228 sample.rxBytes-prev.rxBytes)
230 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
231 r.lastDiskIOSample[dev] = sample
235 type memSample struct {
237 memStat map[string]int64
240 func (r *Reporter) doMemoryStats() {
241 c, err := r.openStatFile("memory", "memory.stat", true)
246 b := bufio.NewScanner(c)
247 thisSample := memSample{time.Now(), make(map[string]int64)}
248 wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
252 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
255 thisSample.memStat[stat] = val
257 var outstat bytes.Buffer
258 for _, key := range wantStats {
259 // Use "total_X" stats (entire hierarchy) if enabled,
260 // otherwise just the single cgroup -- see
261 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
262 if val, ok := thisSample.memStat["total_"+key]; ok {
263 fmt.Fprintf(&outstat, " %d %s", val, key)
264 } else if val, ok := thisSample.memStat[key]; ok {
265 fmt.Fprintf(&outstat, " %d %s", val, key)
268 r.Logger.Printf("mem%s\n", outstat.String())
271 func (r *Reporter) doNetworkStats() {
272 sampleTime := time.Now()
273 stats, err := r.getContainerNetStats()
278 scanner := bufio.NewScanner(stats)
282 words := strings.Fields(scanner.Text())
283 if len(words) != 17 {
284 // Skip lines with wrong format
287 ifName = strings.TrimRight(words[0], ":")
288 if ifName == "lo" || ifName == "" {
289 // Skip loopback interface and lines with wrong format
292 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
295 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
298 nextSample := ioSample{}
299 nextSample.sampleTime = sampleTime
300 nextSample.txBytes = tx
301 nextSample.rxBytes = rx
303 if prev, ok := r.lastNetSample[ifName]; ok {
304 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
305 delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
310 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
311 r.lastNetSample[ifName] = nextSample
315 type diskSpaceSample struct {
323 func (r *Reporter) doDiskSpaceStats() {
324 s := syscall.Statfs_t{}
325 err := syscall.Statfs(r.TempDir, &s)
329 bs := uint64(s.Bsize)
330 nextSample := diskSpaceSample{
332 sampleTime: time.Now(),
333 total: s.Blocks * bs,
334 used: (s.Blocks - s.Bfree) * bs,
335 available: s.Bavail * bs,
339 if r.lastDiskSpaceSample.hasData {
340 prev := r.lastDiskSpaceSample
341 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
342 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
344 int64(nextSample.used-prev.used))
346 r.Logger.Printf("statfs %d available %d used %d total%s\n",
347 nextSample.available, nextSample.used, nextSample.total, delta)
348 r.lastDiskSpaceSample = nextSample
351 type cpuSample struct {
352 hasData bool // to distinguish the zero value from real data
359 // Return the number of CPUs available in the container. Return 0 if
360 // we can't figure out the real number of CPUs.
361 func (r *Reporter) getCPUCount() int64 {
362 cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
366 defer cpusetFile.Close()
367 b, err := r.readAllOrWarn(cpusetFile)
371 sp := strings.Split(string(b), ",")
373 for _, v := range sp {
375 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
377 cpus += (max - min) + 1
385 func (r *Reporter) doCPUStats() {
386 statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
390 defer statFile.Close()
391 b, err := r.readAllOrWarn(statFile)
396 var userTicks, sysTicks int64
397 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
398 userHz := float64(C.sysconf(C._SC_CLK_TCK))
399 nextSample := cpuSample{
401 sampleTime: time.Now(),
402 user: float64(userTicks) / userHz,
403 sys: float64(sysTicks) / userHz,
404 cpus: r.getCPUCount(),
408 if r.lastCPUSample.hasData {
409 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
410 nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
411 nextSample.user-r.lastCPUSample.user,
412 nextSample.sys-r.lastCPUSample.sys)
414 r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
415 nextSample.user, nextSample.sys, nextSample.cpus, delta)
416 r.lastCPUSample = nextSample
419 // Report stats periodically until we learn (via r.done) that someone
421 func (r *Reporter) run() {
422 defer close(r.flushed)
424 r.reportedStatFile = make(map[string]string)
426 if !r.waitForCIDFile() || !r.waitForCgroup() {
430 r.lastNetSample = make(map[string]ioSample)
431 r.lastDiskIOSample = make(map[string]ioSample)
433 if len(r.TempDir) == 0 {
434 // Temporary dir not provided, try to get it from the environment.
435 r.TempDir = os.Getenv("TMPDIR")
437 if len(r.TempDir) > 0 {
438 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
441 ticker := time.NewTicker(r.PollPeriod)
456 // If CID is empty, wait for it to appear in CIDFile. Return true if
457 // we get it before we learn (via r.done) that someone called Stop.
458 func (r *Reporter) waitForCIDFile() bool {
459 if r.CID != "" || r.CIDFile == "" {
463 ticker := time.NewTicker(100 * time.Millisecond)
466 cid, err := ioutil.ReadFile(r.CIDFile)
467 if err == nil && len(cid) > 0 {
474 r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
480 // Wait for the cgroup stats files to appear in cgroup_root. Return
481 // true if they appear before r.done indicates someone called Stop. If
482 // they don't appear within one poll interval, log a warning and keep
484 func (r *Reporter) waitForCgroup() bool {
485 ticker := time.NewTicker(100 * time.Millisecond)
487 warningTimer := time.After(r.PollPeriod)
489 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
497 r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
499 r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)