Merge branch 'main' into 19582-aws-s3v2-driver
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "os"
17         "regexp"
18         "sort"
19         "strconv"
20         "strings"
21         "sync"
22         "syscall"
23         "time"
24 )
25
26 // A Reporter gathers statistics for a cgroup and writes them to a
27 // log.Logger.
28 type Reporter struct {
29         // CID of the container to monitor. If empty, read the CID
30         // from CIDFile (first waiting until a non-empty file appears
31         // at CIDFile). If CIDFile is also empty, report host
32         // statistics.
33         CID string
34
35         // Path to a file we can read CID from.
36         CIDFile string
37
38         // Where cgroup accounting files live on this system, e.g.,
39         // "/sys/fs/cgroup".
40         CgroupRoot string
41
42         // Parent cgroup, e.g., "docker".
43         CgroupParent string
44
45         // Interval between samples. Must be positive.
46         PollPeriod time.Duration
47
48         // Temporary directory, will be monitored for available, used & total space.
49         TempDir string
50
51         // Where to write statistics. Must not be nil.
52         Logger interface {
53                 Printf(fmt string, args ...interface{})
54         }
55
56         kernelPageSize      int64
57         reportedStatFile    map[string]string
58         lastNetSample       map[string]ioSample
59         lastDiskIOSample    map[string]ioSample
60         lastCPUSample       cpuSample
61         lastDiskSpaceSample diskSpaceSample
62
63         reportPIDs   map[string]int
64         reportPIDsMu sync.Mutex
65
66         done    chan struct{} // closed when we should stop reporting
67         flushed chan struct{} // closed when we have made our last report
68 }
69
70 // Start starts monitoring in a new goroutine, and returns
71 // immediately.
72 //
73 // The monitoring goroutine waits for a non-empty CIDFile to appear
74 // (unless CID is non-empty). Then it waits for the accounting files
75 // to appear for the monitored container. Then it collects and reports
76 // statistics until Stop is called.
77 //
78 // Callers should not call Start more than once.
79 //
80 // Callers should not modify public data fields after calling Start.
81 func (r *Reporter) Start() {
82         r.done = make(chan struct{})
83         r.flushed = make(chan struct{})
84         go r.run()
85 }
86
87 // ReportPID starts reporting stats for a specified process.
88 func (r *Reporter) ReportPID(name string, pid int) {
89         r.reportPIDsMu.Lock()
90         defer r.reportPIDsMu.Unlock()
91         if r.reportPIDs == nil {
92                 r.reportPIDs = map[string]int{name: pid}
93         } else {
94                 r.reportPIDs[name] = pid
95         }
96 }
97
98 // Stop reporting. Do not call more than once, or before calling
99 // Start.
100 //
101 // Nothing will be logged after Stop returns.
102 func (r *Reporter) Stop() {
103         close(r.done)
104         <-r.flushed
105 }
106
107 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
108         content, err := ioutil.ReadAll(in)
109         if err != nil {
110                 r.Logger.Printf("warning: %v", err)
111         }
112         return content, err
113 }
114
115 // Open the cgroup stats file in /sys/fs corresponding to the target
116 // cgroup, and return an io.ReadCloser. If no stats file is available,
117 // return nil.
118 //
119 // Log the file that was opened, if it isn't the same file opened on
120 // the last openStatFile for this stat.
121 //
122 // Log "not available" if no file is found and either this stat has
123 // been available in the past, or verbose==true.
124 //
125 // TODO: Instead of trying all options, choose a process in the
126 // container, and read /proc/PID/cgroup to determine the appropriate
127 // cgroup root for the given statgroup. (This will avoid falling back
128 // to host-level stats during container setup and teardown.)
129 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
130         var paths []string
131         if r.CID != "" {
132                 // Collect container's stats
133                 paths = []string{
134                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
135                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
136                 }
137         } else {
138                 // Collect this host's stats
139                 paths = []string{
140                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
141                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
142                 }
143         }
144         var path string
145         var file *os.File
146         var err error
147         for _, path = range paths {
148                 file, err = os.Open(path)
149                 if err == nil {
150                         break
151                 } else {
152                         path = ""
153                 }
154         }
155         if pathWas := r.reportedStatFile[stat]; pathWas != path {
156                 // Log whenever we start using a new/different cgroup
157                 // stat file for a given statistic. This typically
158                 // happens 1 to 3 times per statistic, depending on
159                 // whether we happen to collect stats [a] before any
160                 // processes have been created in the container and
161                 // [b] after all contained processes have exited.
162                 if path == "" && verbose {
163                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
164                 } else if pathWas != "" {
165                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
166                 } else {
167                         r.Logger.Printf("notice: reading stats from %s\n", path)
168                 }
169                 r.reportedStatFile[stat] = path
170         }
171         return file, err
172 }
173
174 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
175         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
176         if err != nil {
177                 return nil, err
178         }
179         defer procsFile.Close()
180         reader := bufio.NewScanner(procsFile)
181         for reader.Scan() {
182                 taskPid := reader.Text()
183                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
184                 stats, err := ioutil.ReadFile(statsFilename)
185                 if err != nil {
186                         r.Logger.Printf("notice: %v", err)
187                         continue
188                 }
189                 return strings.NewReader(string(stats)), nil
190         }
191         return nil, errors.New("Could not read stats for any proc in container")
192 }
193
194 type ioSample struct {
195         sampleTime time.Time
196         txBytes    int64
197         rxBytes    int64
198 }
199
200 func (r *Reporter) doBlkIOStats() {
201         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
202         if err != nil {
203                 return
204         }
205         defer c.Close()
206         b := bufio.NewScanner(c)
207         var sampleTime = time.Now()
208         newSamples := make(map[string]ioSample)
209         for b.Scan() {
210                 var device, op string
211                 var val int64
212                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
213                         continue
214                 }
215                 var thisSample ioSample
216                 var ok bool
217                 if thisSample, ok = newSamples[device]; !ok {
218                         thisSample = ioSample{sampleTime, -1, -1}
219                 }
220                 switch op {
221                 case "Read":
222                         thisSample.rxBytes = val
223                 case "Write":
224                         thisSample.txBytes = val
225                 }
226                 newSamples[device] = thisSample
227         }
228         for dev, sample := range newSamples {
229                 if sample.txBytes < 0 || sample.rxBytes < 0 {
230                         continue
231                 }
232                 delta := ""
233                 if prev, ok := r.lastDiskIOSample[dev]; ok {
234                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
235                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
236                                 sample.txBytes-prev.txBytes,
237                                 sample.rxBytes-prev.rxBytes)
238                 }
239                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
240                 r.lastDiskIOSample[dev] = sample
241         }
242 }
243
244 type memSample struct {
245         sampleTime time.Time
246         memStat    map[string]int64
247 }
248
249 func (r *Reporter) doMemoryStats() {
250         c, err := r.openStatFile("memory", "memory.stat", true)
251         if err != nil {
252                 return
253         }
254         defer c.Close()
255         b := bufio.NewScanner(c)
256         thisSample := memSample{time.Now(), make(map[string]int64)}
257         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
258         for b.Scan() {
259                 var stat string
260                 var val int64
261                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
262                         continue
263                 }
264                 thisSample.memStat[stat] = val
265         }
266         var outstat bytes.Buffer
267         for _, key := range wantStats {
268                 // Use "total_X" stats (entire hierarchy) if enabled,
269                 // otherwise just the single cgroup -- see
270                 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
271                 if val, ok := thisSample.memStat["total_"+key]; ok {
272                         fmt.Fprintf(&outstat, " %d %s", val, key)
273                 } else if val, ok := thisSample.memStat[key]; ok {
274                         fmt.Fprintf(&outstat, " %d %s", val, key)
275                 }
276         }
277         r.Logger.Printf("mem%s\n", outstat.String())
278
279         if r.kernelPageSize == 0 {
280                 // assign "don't try again" value in case we give up
281                 // and return without assigning the real value
282                 r.kernelPageSize = -1
283                 buf, err := os.ReadFile("/proc/self/smaps")
284                 if err != nil {
285                         r.Logger.Printf("error reading /proc/self/smaps: %s", err)
286                         return
287                 }
288                 m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
289                 if len(m) != 2 {
290                         r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
291                         return
292                 }
293                 size, err := strconv.ParseInt(string(m[1]), 10, 64)
294                 if err != nil {
295                         r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
296                         return
297                 }
298                 r.kernelPageSize = size * 1024
299         } else if r.kernelPageSize < 0 {
300                 // already failed to determine page size, don't keep
301                 // trying/logging
302                 return
303         }
304
305         r.reportPIDsMu.Lock()
306         defer r.reportPIDsMu.Unlock()
307         procnames := make([]string, 0, len(r.reportPIDs))
308         for name := range r.reportPIDs {
309                 procnames = append(procnames, name)
310         }
311         sort.Strings(procnames)
312         procmem := ""
313         for _, procname := range procnames {
314                 pid := r.reportPIDs[procname]
315                 buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
316                 if err != nil {
317                         continue
318                 }
319                 // If the executable name contains a ')' char,
320                 // /proc/$pid/stat will look like '1234 (exec name)) S
321                 // 123 ...' -- the last ')' is the end of the 2nd
322                 // field.
323                 paren := bytes.LastIndexByte(buf, ')')
324                 if paren < 0 {
325                         continue
326                 }
327                 fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
328                 if len(fields) < 24 {
329                         continue
330                 }
331                 // rss is the 24th field in .../stat, and fields[0]
332                 // here is the last char ')' of the 2nd field, so
333                 // rss is fields[22]
334                 rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
335                 if err != nil {
336                         continue
337                 }
338                 procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
339         }
340         if procmem != "" {
341                 r.Logger.Printf("procmem%s\n", procmem)
342         }
343 }
344
345 func (r *Reporter) doNetworkStats() {
346         sampleTime := time.Now()
347         stats, err := r.getContainerNetStats()
348         if err != nil {
349                 return
350         }
351
352         scanner := bufio.NewScanner(stats)
353         for scanner.Scan() {
354                 var ifName string
355                 var rx, tx int64
356                 words := strings.Fields(scanner.Text())
357                 if len(words) != 17 {
358                         // Skip lines with wrong format
359                         continue
360                 }
361                 ifName = strings.TrimRight(words[0], ":")
362                 if ifName == "lo" || ifName == "" {
363                         // Skip loopback interface and lines with wrong format
364                         continue
365                 }
366                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
367                         continue
368                 }
369                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
370                         continue
371                 }
372                 nextSample := ioSample{}
373                 nextSample.sampleTime = sampleTime
374                 nextSample.txBytes = tx
375                 nextSample.rxBytes = rx
376                 var delta string
377                 if prev, ok := r.lastNetSample[ifName]; ok {
378                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
379                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
380                                 interval,
381                                 tx-prev.txBytes,
382                                 rx-prev.rxBytes)
383                 }
384                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
385                 r.lastNetSample[ifName] = nextSample
386         }
387 }
388
389 type diskSpaceSample struct {
390         hasData    bool
391         sampleTime time.Time
392         total      uint64
393         used       uint64
394         available  uint64
395 }
396
397 func (r *Reporter) doDiskSpaceStats() {
398         s := syscall.Statfs_t{}
399         err := syscall.Statfs(r.TempDir, &s)
400         if err != nil {
401                 return
402         }
403         bs := uint64(s.Bsize)
404         nextSample := diskSpaceSample{
405                 hasData:    true,
406                 sampleTime: time.Now(),
407                 total:      s.Blocks * bs,
408                 used:       (s.Blocks - s.Bfree) * bs,
409                 available:  s.Bavail * bs,
410         }
411
412         var delta string
413         if r.lastDiskSpaceSample.hasData {
414                 prev := r.lastDiskSpaceSample
415                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
416                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
417                         interval,
418                         int64(nextSample.used-prev.used))
419         }
420         r.Logger.Printf("statfs %d available %d used %d total%s\n",
421                 nextSample.available, nextSample.used, nextSample.total, delta)
422         r.lastDiskSpaceSample = nextSample
423 }
424
425 type cpuSample struct {
426         hasData    bool // to distinguish the zero value from real data
427         sampleTime time.Time
428         user       float64
429         sys        float64
430         cpus       int64
431 }
432
433 // Return the number of CPUs available in the container. Return 0 if
434 // we can't figure out the real number of CPUs.
435 func (r *Reporter) getCPUCount() int64 {
436         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
437         if err != nil {
438                 return 0
439         }
440         defer cpusetFile.Close()
441         b, err := r.readAllOrWarn(cpusetFile)
442         if err != nil {
443                 return 0
444         }
445         sp := strings.Split(string(b), ",")
446         cpus := int64(0)
447         for _, v := range sp {
448                 var min, max int64
449                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
450                 if n == 2 {
451                         cpus += (max - min) + 1
452                 } else {
453                         cpus++
454                 }
455         }
456         return cpus
457 }
458
459 func (r *Reporter) doCPUStats() {
460         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
461         if err != nil {
462                 return
463         }
464         defer statFile.Close()
465         b, err := r.readAllOrWarn(statFile)
466         if err != nil {
467                 return
468         }
469
470         var userTicks, sysTicks int64
471         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
472         userHz := float64(100)
473         nextSample := cpuSample{
474                 hasData:    true,
475                 sampleTime: time.Now(),
476                 user:       float64(userTicks) / userHz,
477                 sys:        float64(sysTicks) / userHz,
478                 cpus:       r.getCPUCount(),
479         }
480
481         delta := ""
482         if r.lastCPUSample.hasData {
483                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
484                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
485                         nextSample.user-r.lastCPUSample.user,
486                         nextSample.sys-r.lastCPUSample.sys)
487         }
488         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
489                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
490         r.lastCPUSample = nextSample
491 }
492
493 // Report stats periodically until we learn (via r.done) that someone
494 // called Stop.
495 func (r *Reporter) run() {
496         defer close(r.flushed)
497
498         r.reportedStatFile = make(map[string]string)
499
500         if !r.waitForCIDFile() || !r.waitForCgroup() {
501                 return
502         }
503
504         r.lastNetSample = make(map[string]ioSample)
505         r.lastDiskIOSample = make(map[string]ioSample)
506
507         if len(r.TempDir) == 0 {
508                 // Temporary dir not provided, try to get it from the environment.
509                 r.TempDir = os.Getenv("TMPDIR")
510         }
511         if len(r.TempDir) > 0 {
512                 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
513         }
514
515         ticker := time.NewTicker(r.PollPeriod)
516         for {
517                 r.doMemoryStats()
518                 r.doCPUStats()
519                 r.doBlkIOStats()
520                 r.doNetworkStats()
521                 r.doDiskSpaceStats()
522                 select {
523                 case <-r.done:
524                         return
525                 case <-ticker.C:
526                 }
527         }
528 }
529
530 // If CID is empty, wait for it to appear in CIDFile. Return true if
531 // we get it before we learn (via r.done) that someone called Stop.
532 func (r *Reporter) waitForCIDFile() bool {
533         if r.CID != "" || r.CIDFile == "" {
534                 return true
535         }
536
537         ticker := time.NewTicker(100 * time.Millisecond)
538         defer ticker.Stop()
539         for {
540                 cid, err := ioutil.ReadFile(r.CIDFile)
541                 if err == nil && len(cid) > 0 {
542                         r.CID = string(cid)
543                         return true
544                 }
545                 select {
546                 case <-ticker.C:
547                 case <-r.done:
548                         r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
549                         return false
550                 }
551         }
552 }
553
554 // Wait for the cgroup stats files to appear in cgroup_root. Return
555 // true if they appear before r.done indicates someone called Stop. If
556 // they don't appear within one poll interval, log a warning and keep
557 // waiting.
558 func (r *Reporter) waitForCgroup() bool {
559         ticker := time.NewTicker(100 * time.Millisecond)
560         defer ticker.Stop()
561         warningTimer := time.After(r.PollPeriod)
562         for {
563                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
564                 if err == nil {
565                         c.Close()
566                         return true
567                 }
568                 select {
569                 case <-ticker.C:
570                 case <-warningTimer:
571                         r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
572                 case <-r.done:
573                         r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
574                         return false
575                 }
576         }
577 }