Merge branch '17244-cgroup2'
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/fs"
16         "io/ioutil"
17         "os"
18         "path/filepath"
19         "regexp"
20         "sort"
21         "strconv"
22         "strings"
23         "sync"
24         "syscall"
25         "time"
26 )
27
28 // crunchstat collects all memory statistics, but only reports these.
29 var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"}
30
31 type logPrinter interface {
32         Printf(fmt string, args ...interface{})
33 }
34
35 // A Reporter gathers statistics for a cgroup and writes them to a
36 // log.Logger.
37 type Reporter struct {
38         // Func that returns the pid of a process inside the desired
39         // cgroup. Reporter will call Pid periodically until it
40         // returns a positive number, then start reporting stats for
41         // the cgroup that process belongs to.
42         //
43         // Pid is used when cgroups v2 is available. For cgroups v1,
44         // see below.
45         Pid func() int
46
47         // Interval between samples. Must be positive.
48         PollPeriod time.Duration
49
50         // Temporary directory, will be monitored for available, used
51         // & total space.
52         TempDir string
53
54         // Where to write statistics. Must not be nil.
55         Logger logPrinter
56
57         // When stats cross thresholds configured in the fields below,
58         // they are reported to this logger.
59         ThresholdLogger logPrinter
60
61         // MemThresholds maps memory stat names to slices of thresholds.
62         // When the corresponding stat exceeds a threshold, that will be logged.
63         MemThresholds map[string][]Threshold
64
65         // Filesystem to read /proc entries and cgroup stats from.
66         // Non-nil for testing, nil for real root filesystem.
67         FS fs.FS
68
69         // Enable debug messages.
70         Debug bool
71
72         // available cgroup hierarchies
73         statFiles struct {
74                 cpusetCpus        string // v1,v2 (via /proc/$PID/cpuset)
75                 cpuacctStat       string // v1 (via /proc/$PID/cgroup => cpuacct)
76                 cpuStat           string // v2
77                 ioServiceBytes    string // v1 (via /proc/$PID/cgroup => blkio)
78                 ioStat            string // v2
79                 memoryStat        string // v1 and v2 (but v2 is missing some entries)
80                 memoryCurrent     string // v2
81                 memorySwapCurrent string // v2
82                 netDev            string // /proc/$PID/net/dev
83         }
84
85         kernelPageSize      int64
86         lastNetSample       map[string]ioSample
87         lastDiskIOSample    map[string]ioSample
88         lastCPUSample       cpuSample
89         lastDiskSpaceSample diskSpaceSample
90         lastMemSample       memSample
91         maxDiskSpaceSample  diskSpaceSample
92         maxMemSample        map[memoryKey]int64
93
94         // process returned by Pid(), whose cgroup stats we are
95         // reporting
96         pid int
97
98         // individual processes whose memory size we are reporting
99         reportPIDs   map[string]int
100         reportPIDsMu sync.Mutex
101
102         done    chan struct{} // closed when we should stop reporting
103         ready   chan struct{} // have pid and stat files
104         flushed chan struct{} // closed when we have made our last report
105 }
106
107 type Threshold struct {
108         percentage int64
109         threshold  int64
110         total      int64
111 }
112
113 func NewThresholdFromPercentage(total int64, percentage int64) Threshold {
114         return Threshold{
115                 percentage: percentage,
116                 threshold:  total * percentage / 100,
117                 total:      total,
118         }
119 }
120
121 func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) {
122         for _, percentage := range percentages {
123                 thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage))
124         }
125         return
126 }
127
128 // memoryKey is a key into Reporter.maxMemSample.
129 // Initialize it with just statName to get the host/cgroup maximum.
130 // Initialize it with all fields to get that process' maximum.
131 type memoryKey struct {
132         processID   int
133         processName string
134         statName    string
135 }
136
137 // Start starts monitoring in a new goroutine, and returns
138 // immediately.
139 //
140 // The monitoring goroutine waits for a non-empty CIDFile to appear
141 // (unless CID is non-empty). Then it waits for the accounting files
142 // to appear for the monitored container. Then it collects and reports
143 // statistics until Stop is called.
144 //
145 // Callers should not call Start more than once.
146 //
147 // Callers should not modify public data fields after calling Start.
148 func (r *Reporter) Start() {
149         r.done = make(chan struct{})
150         r.ready = make(chan struct{})
151         r.flushed = make(chan struct{})
152         if r.FS == nil {
153                 r.FS = os.DirFS("/")
154         }
155         go r.run()
156 }
157
158 // ReportPID starts reporting stats for a specified process.
159 func (r *Reporter) ReportPID(name string, pid int) {
160         r.reportPIDsMu.Lock()
161         defer r.reportPIDsMu.Unlock()
162         if r.reportPIDs == nil {
163                 r.reportPIDs = map[string]int{name: pid}
164         } else {
165                 r.reportPIDs[name] = pid
166         }
167 }
168
169 // Stop reporting. Do not call more than once, or before calling
170 // Start.
171 //
172 // Nothing will be logged after Stop returns unless you call a Log* method.
173 func (r *Reporter) Stop() {
174         close(r.done)
175         <-r.flushed
176 }
177
178 var v1keys = map[string]bool{
179         "blkio":   true,
180         "cpuacct": true,
181         "cpuset":  true,
182         "memory":  true,
183 }
184
185 // Find cgroup hierarchies in /proc/mounts, e.g.,
186 //
187 //      {
188 //              "blkio": "/sys/fs/cgroup/blkio",
189 //              "unified": "/sys/fs/cgroup/unified",
190 //      }
191 func (r *Reporter) cgroupMounts() map[string]string {
192         procmounts, err := fs.ReadFile(r.FS, "proc/mounts")
193         if err != nil {
194                 r.Logger.Printf("error reading /proc/mounts: %s", err)
195                 return nil
196         }
197         mounts := map[string]string{}
198         for _, line := range bytes.Split(procmounts, []byte{'\n'}) {
199                 fields := bytes.SplitN(line, []byte{' '}, 6)
200                 if len(fields) != 6 {
201                         continue
202                 }
203                 switch string(fields[2]) {
204                 case "cgroup2":
205                         // cgroup /sys/fs/cgroup/unified cgroup2 rw,nosuid,nodev,noexec,relatime 0 0
206                         mounts["unified"] = string(fields[1])
207                 case "cgroup":
208                         // cgroup /sys/fs/cgroup/blkio cgroup rw,nosuid,nodev,noexec,relatime,blkio 0 0
209                         options := bytes.Split(fields[3], []byte{','})
210                         for _, option := range options {
211                                 option := string(option)
212                                 if v1keys[option] {
213                                         mounts[option] = string(fields[1])
214                                         break
215                                 }
216                         }
217                 }
218         }
219         return mounts
220 }
221
222 // generate map of cgroup controller => path for r.pid.
223 //
224 // the "unified" controller represents cgroups v2.
225 func (r *Reporter) cgroupPaths(mounts map[string]string) map[string]string {
226         if len(mounts) == 0 {
227                 return nil
228         }
229         procdir := fmt.Sprintf("proc/%d", r.pid)
230         buf, err := fs.ReadFile(r.FS, procdir+"/cgroup")
231         if err != nil {
232                 r.Logger.Printf("error reading cgroup file: %s", err)
233                 return nil
234         }
235         paths := map[string]string{}
236         for _, line := range bytes.Split(buf, []byte{'\n'}) {
237                 // The entry for cgroup v2 is always in the format
238                 // "0::$PATH" --
239                 // https://docs.kernel.org/admin-guide/cgroup-v2.html
240                 if bytes.HasPrefix(line, []byte("0::/")) && mounts["unified"] != "" {
241                         paths["unified"] = mounts["unified"] + string(line[3:])
242                         continue
243                 }
244                 // cgroups v1 entries look like
245                 // "6:cpu,cpuacct:/user.slice"
246                 fields := bytes.SplitN(line, []byte{':'}, 3)
247                 if len(fields) != 3 {
248                         continue
249                 }
250                 for _, key := range bytes.Split(fields[1], []byte{','}) {
251                         key := string(key)
252                         if mounts[key] != "" {
253                                 paths[key] = mounts[key] + string(fields[2])
254                         }
255                 }
256         }
257         // In unified mode, /proc/$PID/cgroup doesn't have a cpuset
258         // entry, but we still need it -- there's no cpuset.cpus file
259         // in the cgroup2 subtree indicated by the 0::$PATH entry. We
260         // have to get the right path from /proc/$PID/cpuset.
261         if _, found := paths["cpuset"]; !found && mounts["unified"] != "" {
262                 buf, _ := fs.ReadFile(r.FS, procdir+"/cpuset")
263                 cpusetPath := string(bytes.TrimRight(buf, "\n"))
264                 paths["cpuset"] = mounts["unified"] + cpusetPath
265         }
266         return paths
267 }
268
269 func (r *Reporter) findStatFiles() {
270         mounts := r.cgroupMounts()
271         paths := r.cgroupPaths(mounts)
272         done := map[*string]bool{}
273         for _, try := range []struct {
274                 statFile *string
275                 pathkey  string
276                 file     string
277         }{
278                 {&r.statFiles.cpusetCpus, "cpuset", "cpuset.cpus.effective"},
279                 {&r.statFiles.cpusetCpus, "cpuset", "cpuset.cpus"},
280                 {&r.statFiles.cpuacctStat, "cpuacct", "cpuacct.stat"},
281                 {&r.statFiles.cpuStat, "unified", "cpu.stat"},
282                 // blkio.throttle.io_service_bytes must precede
283                 // blkio.io_service_bytes -- on ubuntu1804, the latter
284                 // is present but reports 0
285                 {&r.statFiles.ioServiceBytes, "blkio", "blkio.throttle.io_service_bytes"},
286                 {&r.statFiles.ioServiceBytes, "blkio", "blkio.io_service_bytes"},
287                 {&r.statFiles.ioStat, "unified", "io.stat"},
288                 {&r.statFiles.memoryStat, "unified", "memory.stat"},
289                 {&r.statFiles.memoryStat, "memory", "memory.stat"},
290                 {&r.statFiles.memoryCurrent, "unified", "memory.current"},
291                 {&r.statFiles.memorySwapCurrent, "unified", "memory.swap.current"},
292         } {
293                 startpath, ok := paths[try.pathkey]
294                 if !ok || done[try.statFile] {
295                         continue
296                 }
297                 // /proc/$PID/cgroup says cgroup path is
298                 // /exa/mple/exa/mple, however, sometimes the file we
299                 // need is not under that path, it's only available in
300                 // a parent cgroup's dir.  So we start at
301                 // /sys/fs/cgroup/unified/exa/mple/exa/mple/ and walk
302                 // up to /sys/fs/cgroup/unified/ until we find the
303                 // desired file.
304                 //
305                 // This might mean our reported stats include more
306                 // cgroups in the cgroup tree, but it's the best we
307                 // can do.
308                 for path := startpath; path != "" && path != "/" && (path == startpath || strings.HasPrefix(path, mounts[try.pathkey])); path, _ = filepath.Split(strings.TrimRight(path, "/")) {
309                         target := strings.TrimLeft(filepath.Join(path, try.file), "/")
310                         buf, err := fs.ReadFile(r.FS, target)
311                         if err != nil || len(buf) == 0 || bytes.Equal(buf, []byte{'\n'}) {
312                                 if r.Debug {
313                                         if os.IsNotExist(err) {
314                                                 // don't stutter
315                                                 err = os.ErrNotExist
316                                         }
317                                         r.Logger.Printf("skip /%s: %s", target, err)
318                                 }
319                                 continue
320                         }
321                         *try.statFile = target
322                         done[try.statFile] = true
323                         r.Logger.Printf("notice: reading stats from /%s", target)
324                         break
325                 }
326         }
327
328         netdev := fmt.Sprintf("proc/%d/net/dev", r.pid)
329         if buf, err := fs.ReadFile(r.FS, netdev); err == nil && len(buf) > 0 {
330                 r.statFiles.netDev = netdev
331                 r.Logger.Printf("using /%s", netdev)
332         }
333 }
334
335 func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) {
336         var units string
337         switch statName {
338         case "pgmajfault":
339                 units = "faults"
340         default:
341                 units = "bytes"
342         }
343         if limit > 0 {
344                 percentage := 100 * value / limit
345                 logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s",
346                         source, statName, percentage, value, limit, units)
347         } else {
348                 logger.Printf("Maximum %s memory %s usage was %d %s",
349                         source, statName, value, units)
350         }
351 }
352
353 func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) {
354         if r.lastCPUSample.hasData {
355                 logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs",
356                         r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus)
357         }
358         for disk, sample := range r.lastDiskIOSample {
359                 logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read",
360                         disk, sample.txBytes, sample.rxBytes)
361         }
362         if r.maxDiskSpaceSample.total > 0 {
363                 percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total
364                 logger.Printf("Maximum disk usage was %d%%, %d/%d bytes",
365                         percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total)
366         }
367         for _, statName := range memoryStats {
368                 value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}]
369                 if !ok {
370                         value, ok = r.maxMemSample[memoryKey{statName: statName}]
371                 }
372                 if ok {
373                         r.reportMemoryMax(logger, "container", statName, value, memLimits[statName])
374                 }
375         }
376         for ifname, sample := range r.lastNetSample {
377                 logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read",
378                         ifname, sample.txBytes, sample.rxBytes)
379         }
380 }
381
382 func (r *Reporter) LogProcessMemMax(logger logPrinter) {
383         for memKey, value := range r.maxMemSample {
384                 if memKey.processName == "" {
385                         continue
386                 }
387                 r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0)
388         }
389 }
390
391 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
392         content, err := ioutil.ReadAll(in)
393         if err != nil {
394                 r.Logger.Printf("warning: %v", err)
395         }
396         return content, err
397 }
398
399 type ioSample struct {
400         sampleTime time.Time
401         txBytes    int64
402         rxBytes    int64
403 }
404
405 func (r *Reporter) doBlkIOStats() {
406         var sampleTime = time.Now()
407         newSamples := make(map[string]ioSample)
408
409         if r.statFiles.ioStat != "" {
410                 statfile, err := fs.ReadFile(r.FS, r.statFiles.ioStat)
411                 if err != nil {
412                         return
413                 }
414                 for _, line := range bytes.Split(statfile, []byte{'\n'}) {
415                         // 254:16 rbytes=72163328 wbytes=117370880 rios=3811 wios=3906 dbytes=0 dios=0
416                         words := bytes.Split(line, []byte{' '})
417                         if len(words) < 2 {
418                                 continue
419                         }
420                         thisSample := ioSample{sampleTime, -1, -1}
421                         for _, kv := range words[1:] {
422                                 if bytes.HasPrefix(kv, []byte("rbytes=")) {
423                                         fmt.Sscanf(string(kv[7:]), "%d", &thisSample.rxBytes)
424                                 } else if bytes.HasPrefix(kv, []byte("wbytes=")) {
425                                         fmt.Sscanf(string(kv[7:]), "%d", &thisSample.txBytes)
426                                 }
427                         }
428                         if thisSample.rxBytes >= 0 && thisSample.txBytes >= 0 {
429                                 newSamples[string(words[0])] = thisSample
430                         }
431                 }
432         } else if r.statFiles.ioServiceBytes != "" {
433                 statfile, err := fs.ReadFile(r.FS, r.statFiles.ioServiceBytes)
434                 if err != nil {
435                         return
436                 }
437                 for _, line := range bytes.Split(statfile, []byte{'\n'}) {
438                         var device, op string
439                         var val int64
440                         if _, err := fmt.Sscanf(string(line), "%s %s %d", &device, &op, &val); err != nil {
441                                 continue
442                         }
443                         var thisSample ioSample
444                         var ok bool
445                         if thisSample, ok = newSamples[device]; !ok {
446                                 thisSample = ioSample{sampleTime, -1, -1}
447                         }
448                         switch op {
449                         case "Read":
450                                 thisSample.rxBytes = val
451                         case "Write":
452                                 thisSample.txBytes = val
453                         }
454                         newSamples[device] = thisSample
455                 }
456         }
457
458         for dev, sample := range newSamples {
459                 if sample.txBytes < 0 || sample.rxBytes < 0 {
460                         continue
461                 }
462                 delta := ""
463                 if prev, ok := r.lastDiskIOSample[dev]; ok {
464                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
465                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
466                                 sample.txBytes-prev.txBytes,
467                                 sample.rxBytes-prev.rxBytes)
468                 }
469                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
470                 r.lastDiskIOSample[dev] = sample
471         }
472 }
473
474 type memSample struct {
475         sampleTime time.Time
476         memStat    map[string]int64
477 }
478
479 func (r *Reporter) getMemSample() {
480         thisSample := memSample{time.Now(), make(map[string]int64)}
481
482         // memory.stat contains "pgmajfault" in cgroups v1 and v2. It
483         // also contains "rss", "swap", and "cache" in cgroups v1.
484         c, err := r.FS.Open(r.statFiles.memoryStat)
485         if err != nil {
486                 return
487         }
488         defer c.Close()
489         b := bufio.NewScanner(c)
490         for b.Scan() {
491                 var stat string
492                 var val int64
493                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
494                         continue
495                 }
496                 thisSample.memStat[stat] = val
497         }
498
499         // In cgroups v2, we need to read "memory.current" and
500         // "memory.swap.current" as well.
501         for stat, fnm := range map[string]string{
502                 // memory.current includes cache. We don't get
503                 // separate rss/cache values, so we call
504                 // memory usage "rss" for compatibility, and
505                 // omit "cache".
506                 "rss":  r.statFiles.memoryCurrent,
507                 "swap": r.statFiles.memorySwapCurrent,
508         } {
509                 if fnm == "" {
510                         continue
511                 }
512                 buf, err := fs.ReadFile(r.FS, fnm)
513                 if err != nil {
514                         continue
515                 }
516                 var val int64
517                 _, err = fmt.Sscanf(string(buf), "%d", &val)
518                 if err != nil {
519                         continue
520                 }
521                 thisSample.memStat[stat] = val
522         }
523         for stat, val := range thisSample.memStat {
524                 maxKey := memoryKey{statName: stat}
525                 if val > r.maxMemSample[maxKey] {
526                         r.maxMemSample[maxKey] = val
527                 }
528         }
529         r.lastMemSample = thisSample
530
531         if r.ThresholdLogger != nil {
532                 for statName, thresholds := range r.MemThresholds {
533                         statValue, ok := thisSample.memStat["total_"+statName]
534                         if !ok {
535                                 statValue, ok = thisSample.memStat[statName]
536                                 if !ok {
537                                         continue
538                                 }
539                         }
540                         var index int
541                         var statThreshold Threshold
542                         for index, statThreshold = range thresholds {
543                                 if statValue < statThreshold.threshold {
544                                         break
545                                 } else if statThreshold.percentage > 0 {
546                                         r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)",
547                                                 statThreshold.percentage, statName, statValue, statThreshold.total)
548                                 } else {
549                                         r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)",
550                                                 statThreshold.threshold, statName, statValue)
551                                 }
552                         }
553                         r.MemThresholds[statName] = thresholds[index:]
554                 }
555         }
556 }
557
558 func (r *Reporter) reportMemSample() {
559         var outstat bytes.Buffer
560         for _, key := range memoryStats {
561                 // Use "total_X" stats (entire hierarchy) if enabled,
562                 // otherwise just the single cgroup -- see
563                 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
564                 if val, ok := r.lastMemSample.memStat["total_"+key]; ok {
565                         fmt.Fprintf(&outstat, " %d %s", val, key)
566                 } else if val, ok := r.lastMemSample.memStat[key]; ok {
567                         fmt.Fprintf(&outstat, " %d %s", val, key)
568                 }
569         }
570         r.Logger.Printf("mem%s\n", outstat.String())
571 }
572
573 func (r *Reporter) doProcmemStats() {
574         if r.kernelPageSize == 0 {
575                 // assign "don't try again" value in case we give up
576                 // and return without assigning the real value
577                 r.kernelPageSize = -1
578                 buf, err := fs.ReadFile(r.FS, "proc/self/smaps")
579                 if err != nil {
580                         r.Logger.Printf("error reading /proc/self/smaps: %s", err)
581                         return
582                 }
583                 m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
584                 if len(m) != 2 {
585                         r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
586                         return
587                 }
588                 size, err := strconv.ParseInt(string(m[1]), 10, 64)
589                 if err != nil {
590                         r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
591                         return
592                 }
593                 r.kernelPageSize = size * 1024
594         } else if r.kernelPageSize < 0 {
595                 // already failed to determine page size, don't keep
596                 // trying/logging
597                 return
598         }
599
600         r.reportPIDsMu.Lock()
601         defer r.reportPIDsMu.Unlock()
602         procnames := make([]string, 0, len(r.reportPIDs))
603         for name := range r.reportPIDs {
604                 procnames = append(procnames, name)
605         }
606         sort.Strings(procnames)
607         procmem := ""
608         for _, procname := range procnames {
609                 pid := r.reportPIDs[procname]
610                 buf, err := fs.ReadFile(r.FS, fmt.Sprintf("proc/%d/stat", pid))
611                 if err != nil {
612                         continue
613                 }
614                 // If the executable name contains a ')' char,
615                 // /proc/$pid/stat will look like '1234 (exec name)) S
616                 // 123 ...' -- the last ')' is the end of the 2nd
617                 // field.
618                 paren := bytes.LastIndexByte(buf, ')')
619                 if paren < 0 {
620                         continue
621                 }
622                 fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
623                 if len(fields) < 24 {
624                         continue
625                 }
626                 // rss is the 24th field in .../stat, and fields[0]
627                 // here is the last char ')' of the 2nd field, so
628                 // rss is fields[22]
629                 rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
630                 if err != nil {
631                         continue
632                 }
633                 value := rss * r.kernelPageSize
634                 procmem += fmt.Sprintf(" %d %s", value, procname)
635                 maxKey := memoryKey{pid, procname, "rss"}
636                 if value > r.maxMemSample[maxKey] {
637                         r.maxMemSample[maxKey] = value
638                 }
639         }
640         if procmem != "" {
641                 r.Logger.Printf("procmem%s\n", procmem)
642         }
643 }
644
645 func (r *Reporter) doNetworkStats() {
646         if r.statFiles.netDev == "" {
647                 return
648         }
649         sampleTime := time.Now()
650         stats, err := r.FS.Open(r.statFiles.netDev)
651         if err != nil {
652                 return
653         }
654         defer stats.Close()
655         scanner := bufio.NewScanner(stats)
656         for scanner.Scan() {
657                 var ifName string
658                 var rx, tx int64
659                 words := strings.Fields(scanner.Text())
660                 if len(words) != 17 {
661                         // Skip lines with wrong format
662                         continue
663                 }
664                 ifName = strings.TrimRight(words[0], ":")
665                 if ifName == "lo" || ifName == "" {
666                         // Skip loopback interface and lines with wrong format
667                         continue
668                 }
669                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
670                         continue
671                 }
672                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
673                         continue
674                 }
675                 nextSample := ioSample{}
676                 nextSample.sampleTime = sampleTime
677                 nextSample.txBytes = tx
678                 nextSample.rxBytes = rx
679                 var delta string
680                 if prev, ok := r.lastNetSample[ifName]; ok {
681                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
682                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
683                                 interval,
684                                 tx-prev.txBytes,
685                                 rx-prev.rxBytes)
686                 }
687                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
688                 r.lastNetSample[ifName] = nextSample
689         }
690 }
691
692 type diskSpaceSample struct {
693         hasData    bool
694         sampleTime time.Time
695         total      uint64
696         used       uint64
697         available  uint64
698 }
699
700 func (r *Reporter) doDiskSpaceStats() {
701         s := syscall.Statfs_t{}
702         err := syscall.Statfs(r.TempDir, &s)
703         if err != nil {
704                 return
705         }
706         bs := uint64(s.Bsize)
707         nextSample := diskSpaceSample{
708                 hasData:    true,
709                 sampleTime: time.Now(),
710                 total:      s.Blocks * bs,
711                 used:       (s.Blocks - s.Bfree) * bs,
712                 available:  s.Bavail * bs,
713         }
714         if nextSample.used > r.maxDiskSpaceSample.used {
715                 r.maxDiskSpaceSample = nextSample
716         }
717
718         var delta string
719         if r.lastDiskSpaceSample.hasData {
720                 prev := r.lastDiskSpaceSample
721                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
722                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
723                         interval,
724                         int64(nextSample.used-prev.used))
725         }
726         r.Logger.Printf("statfs %d available %d used %d total%s\n",
727                 nextSample.available, nextSample.used, nextSample.total, delta)
728         r.lastDiskSpaceSample = nextSample
729 }
730
731 type cpuSample struct {
732         hasData    bool // to distinguish the zero value from real data
733         sampleTime time.Time
734         user       float64
735         sys        float64
736         cpus       int64
737 }
738
739 // Return the number of CPUs available in the container. Return 0 if
740 // we can't figure out the real number of CPUs.
741 func (r *Reporter) getCPUCount() int64 {
742         buf, err := fs.ReadFile(r.FS, r.statFiles.cpusetCpus)
743         if err != nil {
744                 return 0
745         }
746         cpus := int64(0)
747         for _, v := range bytes.Split(buf, []byte{','}) {
748                 var min, max int64
749                 n, _ := fmt.Sscanf(string(v), "%d-%d", &min, &max)
750                 if n == 2 {
751                         cpus += (max - min) + 1
752                 } else {
753                         cpus++
754                 }
755         }
756         return cpus
757 }
758
759 func (r *Reporter) doCPUStats() {
760         var nextSample cpuSample
761         if r.statFiles.cpuStat != "" {
762                 // v2
763                 f, err := r.FS.Open(r.statFiles.cpuStat)
764                 if err != nil {
765                         return
766                 }
767                 defer f.Close()
768                 nextSample = cpuSample{
769                         hasData:    true,
770                         sampleTime: time.Now(),
771                         cpus:       r.getCPUCount(),
772                 }
773                 for {
774                         var stat string
775                         var val int64
776                         n, err := fmt.Fscanf(f, "%s %d\n", &stat, &val)
777                         if err != nil || n != 2 {
778                                 break
779                         }
780                         if stat == "user_usec" {
781                                 nextSample.user = float64(val) / 1000000
782                         } else if stat == "system_usec" {
783                                 nextSample.sys = float64(val) / 1000000
784                         }
785                 }
786         } else if r.statFiles.cpuacctStat != "" {
787                 // v1
788                 b, err := fs.ReadFile(r.FS, r.statFiles.cpuacctStat)
789                 if err != nil {
790                         return
791                 }
792
793                 var userTicks, sysTicks int64
794                 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
795                 userHz := float64(100)
796                 nextSample = cpuSample{
797                         hasData:    true,
798                         sampleTime: time.Now(),
799                         user:       float64(userTicks) / userHz,
800                         sys:        float64(sysTicks) / userHz,
801                         cpus:       r.getCPUCount(),
802                 }
803         }
804
805         delta := ""
806         if r.lastCPUSample.hasData {
807                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
808                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
809                         nextSample.user-r.lastCPUSample.user,
810                         nextSample.sys-r.lastCPUSample.sys)
811         }
812         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
813                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
814         r.lastCPUSample = nextSample
815 }
816
817 func (r *Reporter) doAllStats() {
818         r.reportMemSample()
819         r.doProcmemStats()
820         r.doCPUStats()
821         r.doBlkIOStats()
822         r.doNetworkStats()
823         r.doDiskSpaceStats()
824 }
825
826 // Report stats periodically until we learn (via r.done) that someone
827 // called Stop.
828 func (r *Reporter) run() {
829         defer close(r.flushed)
830
831         r.maxMemSample = make(map[memoryKey]int64)
832
833         if !r.waitForPid() {
834                 return
835         }
836         r.findStatFiles()
837         close(r.ready)
838
839         r.lastNetSample = make(map[string]ioSample)
840         r.lastDiskIOSample = make(map[string]ioSample)
841
842         if len(r.TempDir) == 0 {
843                 // Temporary dir not provided, try to get it from the environment.
844                 r.TempDir = os.Getenv("TMPDIR")
845         }
846         if len(r.TempDir) > 0 {
847                 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
848         }
849
850         r.getMemSample()
851         r.doAllStats()
852
853         if r.PollPeriod < 1 {
854                 r.PollPeriod = time.Second * 10
855         }
856
857         memTicker := time.NewTicker(time.Second)
858         mainTicker := time.NewTicker(r.PollPeriod)
859         for {
860                 select {
861                 case <-r.done:
862                         return
863                 case <-memTicker.C:
864                         r.getMemSample()
865                 case <-mainTicker.C:
866                         r.doAllStats()
867                 }
868         }
869 }
870
871 // Wait for Pid() to return a real pid.  Return true if this succeeds
872 // before Stop is called.
873 func (r *Reporter) waitForPid() bool {
874         ticker := time.NewTicker(100 * time.Millisecond)
875         defer ticker.Stop()
876         warningTimer := time.After(r.PollPeriod)
877         for {
878                 r.pid = r.Pid()
879                 if r.pid > 0 {
880                         break
881                 }
882                 select {
883                 case <-ticker.C:
884                 case <-warningTimer:
885                         r.Logger.Printf("warning: Pid() did not return a process ID after %v (config error?) -- still waiting...", r.PollPeriod)
886                 case <-r.done:
887                         r.Logger.Printf("warning: Pid() never returned a process ID")
888                         return false
889                 }
890         }
891         return true
892 }
893
894 func (r *Reporter) dumpSourceFiles(destdir string) error {
895         select {
896         case <-r.done:
897                 return errors.New("reporter was never ready")
898         case <-r.ready:
899         }
900         todo := []string{
901                 fmt.Sprintf("proc/%d/cgroup", r.pid),
902                 fmt.Sprintf("proc/%d/cpuset", r.pid),
903                 "proc/mounts",
904                 "proc/self/smaps",
905                 r.statFiles.cpusetCpus,
906                 r.statFiles.cpuacctStat,
907                 r.statFiles.cpuStat,
908                 r.statFiles.ioServiceBytes,
909                 r.statFiles.ioStat,
910                 r.statFiles.memoryStat,
911                 r.statFiles.memoryCurrent,
912                 r.statFiles.memorySwapCurrent,
913                 r.statFiles.netDev,
914         }
915         for _, path := range todo {
916                 if path == "" {
917                         continue
918                 }
919                 err := r.createParentsAndCopyFile(destdir, path)
920                 if err != nil {
921                         return err
922                 }
923         }
924         r.reportPIDsMu.Lock()
925         r.reportPIDsMu.Unlock()
926         for _, pid := range r.reportPIDs {
927                 path := fmt.Sprintf("proc/%d/stat", pid)
928                 err := r.createParentsAndCopyFile(destdir, path)
929                 if err != nil {
930                         return err
931                 }
932         }
933         if proc, err := os.FindProcess(r.pid); err != nil || proc.Signal(syscall.Signal(0)) != nil {
934                 return fmt.Errorf("process %d no longer exists, snapshot is probably broken", r.pid)
935         }
936         return nil
937 }
938
939 func (r *Reporter) createParentsAndCopyFile(destdir, path string) error {
940         buf, err := fs.ReadFile(r.FS, path)
941         if os.IsNotExist(err) {
942                 return nil
943         } else if err != nil {
944                 return err
945         }
946         if parent, _ := filepath.Split(path); parent != "" {
947                 err = os.MkdirAll(destdir+"/"+parent, 0777)
948                 if err != nil {
949                         return fmt.Errorf("mkdir %s: %s", destdir+"/"+parent, err)
950                 }
951         }
952         destfile := destdir + "/" + path
953         r.Logger.Printf("copy %s to %s -- size %d", path, destfile, len(buf))
954         return os.WriteFile(destfile, buf, 0777)
955 }