19986: Log when a container uses nearly max RAM
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "os"
17         "regexp"
18         "sort"
19         "strconv"
20         "strings"
21         "sync"
22         "syscall"
23         "time"
24 )
25
26 type logPrinter interface {
27         Printf(fmt string, args ...interface{})
28 }
29
30 // A Reporter gathers statistics for a cgroup and writes them to a
31 // log.Logger.
32 type Reporter struct {
33         // CID of the container to monitor. If empty, read the CID
34         // from CIDFile (first waiting until a non-empty file appears
35         // at CIDFile). If CIDFile is also empty, report host
36         // statistics.
37         CID string
38
39         // Path to a file we can read CID from.
40         CIDFile string
41
42         // Where cgroup accounting files live on this system, e.g.,
43         // "/sys/fs/cgroup".
44         CgroupRoot string
45
46         // Parent cgroup, e.g., "docker".
47         CgroupParent string
48
49         // Interval between samples. Must be positive.
50         PollPeriod time.Duration
51
52         // Temporary directory, will be monitored for available, used & total space.
53         TempDir string
54
55         // Where to write statistics. Must not be nil.
56         Logger logPrinter
57
58         // When stats cross thresholds configured in the fields below,
59         // they are reported to this logger.
60         ThresholdLogger logPrinter
61
62         // MemThresholds maps memory stat names to slices of thresholds.
63         // When the corresponding stat exceeds a threshold, that will be logged.
64         MemThresholds map[string][]Threshold
65
66         kernelPageSize      int64
67         reportedStatFile    map[string]string
68         lastNetSample       map[string]ioSample
69         lastDiskIOSample    map[string]ioSample
70         lastCPUSample       cpuSample
71         lastDiskSpaceSample diskSpaceSample
72         lastMemSample       memSample
73
74         reportPIDs   map[string]int
75         reportPIDsMu sync.Mutex
76
77         done    chan struct{} // closed when we should stop reporting
78         flushed chan struct{} // closed when we have made our last report
79 }
80
81 type Threshold struct {
82         percentage int64
83         threshold  int64
84         total      int64
85 }
86
87 func NewThresholdFromPercentage(total int64, percentage int64) Threshold {
88         return Threshold{
89                 percentage: percentage,
90                 threshold:  total * percentage / 100,
91                 total:      total,
92         }
93 }
94
95 func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) {
96         for _, percentage := range percentages {
97                 thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage))
98         }
99         return
100 }
101
102 // Start starts monitoring in a new goroutine, and returns
103 // immediately.
104 //
105 // The monitoring goroutine waits for a non-empty CIDFile to appear
106 // (unless CID is non-empty). Then it waits for the accounting files
107 // to appear for the monitored container. Then it collects and reports
108 // statistics until Stop is called.
109 //
110 // Callers should not call Start more than once.
111 //
112 // Callers should not modify public data fields after calling Start.
113 func (r *Reporter) Start() {
114         r.done = make(chan struct{})
115         r.flushed = make(chan struct{})
116         go r.run()
117 }
118
119 // ReportPID starts reporting stats for a specified process.
120 func (r *Reporter) ReportPID(name string, pid int) {
121         r.reportPIDsMu.Lock()
122         defer r.reportPIDsMu.Unlock()
123         if r.reportPIDs == nil {
124                 r.reportPIDs = map[string]int{name: pid}
125         } else {
126                 r.reportPIDs[name] = pid
127         }
128 }
129
130 // Stop reporting. Do not call more than once, or before calling
131 // Start.
132 //
133 // Nothing will be logged after Stop returns.
134 func (r *Reporter) Stop() {
135         close(r.done)
136         <-r.flushed
137 }
138
139 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
140         content, err := ioutil.ReadAll(in)
141         if err != nil {
142                 r.Logger.Printf("warning: %v", err)
143         }
144         return content, err
145 }
146
147 // Open the cgroup stats file in /sys/fs corresponding to the target
148 // cgroup, and return an io.ReadCloser. If no stats file is available,
149 // return nil.
150 //
151 // Log the file that was opened, if it isn't the same file opened on
152 // the last openStatFile for this stat.
153 //
154 // Log "not available" if no file is found and either this stat has
155 // been available in the past, or verbose==true.
156 //
157 // TODO: Instead of trying all options, choose a process in the
158 // container, and read /proc/PID/cgroup to determine the appropriate
159 // cgroup root for the given statgroup. (This will avoid falling back
160 // to host-level stats during container setup and teardown.)
161 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
162         var paths []string
163         if r.CID != "" {
164                 // Collect container's stats
165                 paths = []string{
166                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
167                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
168                 }
169         } else {
170                 // Collect this host's stats
171                 paths = []string{
172                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
173                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
174                 }
175         }
176         var path string
177         var file *os.File
178         var err error
179         for _, path = range paths {
180                 file, err = os.Open(path)
181                 if err == nil {
182                         break
183                 } else {
184                         path = ""
185                 }
186         }
187         if pathWas := r.reportedStatFile[stat]; pathWas != path {
188                 // Log whenever we start using a new/different cgroup
189                 // stat file for a given statistic. This typically
190                 // happens 1 to 3 times per statistic, depending on
191                 // whether we happen to collect stats [a] before any
192                 // processes have been created in the container and
193                 // [b] after all contained processes have exited.
194                 if path == "" && verbose {
195                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
196                 } else if pathWas != "" {
197                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
198                 } else {
199                         r.Logger.Printf("notice: reading stats from %s\n", path)
200                 }
201                 r.reportedStatFile[stat] = path
202         }
203         return file, err
204 }
205
206 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
207         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
208         if err != nil {
209                 return nil, err
210         }
211         defer procsFile.Close()
212         reader := bufio.NewScanner(procsFile)
213         for reader.Scan() {
214                 taskPid := reader.Text()
215                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
216                 stats, err := ioutil.ReadFile(statsFilename)
217                 if err != nil {
218                         r.Logger.Printf("notice: %v", err)
219                         continue
220                 }
221                 return strings.NewReader(string(stats)), nil
222         }
223         return nil, errors.New("Could not read stats for any proc in container")
224 }
225
226 type ioSample struct {
227         sampleTime time.Time
228         txBytes    int64
229         rxBytes    int64
230 }
231
232 func (r *Reporter) doBlkIOStats() {
233         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
234         if err != nil {
235                 return
236         }
237         defer c.Close()
238         b := bufio.NewScanner(c)
239         var sampleTime = time.Now()
240         newSamples := make(map[string]ioSample)
241         for b.Scan() {
242                 var device, op string
243                 var val int64
244                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
245                         continue
246                 }
247                 var thisSample ioSample
248                 var ok bool
249                 if thisSample, ok = newSamples[device]; !ok {
250                         thisSample = ioSample{sampleTime, -1, -1}
251                 }
252                 switch op {
253                 case "Read":
254                         thisSample.rxBytes = val
255                 case "Write":
256                         thisSample.txBytes = val
257                 }
258                 newSamples[device] = thisSample
259         }
260         for dev, sample := range newSamples {
261                 if sample.txBytes < 0 || sample.rxBytes < 0 {
262                         continue
263                 }
264                 delta := ""
265                 if prev, ok := r.lastDiskIOSample[dev]; ok {
266                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
267                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
268                                 sample.txBytes-prev.txBytes,
269                                 sample.rxBytes-prev.rxBytes)
270                 }
271                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
272                 r.lastDiskIOSample[dev] = sample
273         }
274 }
275
276 type memSample struct {
277         sampleTime time.Time
278         memStat    map[string]int64
279 }
280
281 func (r *Reporter) getMemSample() {
282         c, err := r.openStatFile("memory", "memory.stat", true)
283         if err != nil {
284                 return
285         }
286         defer c.Close()
287         b := bufio.NewScanner(c)
288         thisSample := memSample{time.Now(), make(map[string]int64)}
289         for b.Scan() {
290                 var stat string
291                 var val int64
292                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
293                         continue
294                 }
295                 thisSample.memStat[stat] = val
296         }
297         r.lastMemSample = thisSample
298
299         if r.ThresholdLogger != nil {
300                 for statName, thresholds := range r.MemThresholds {
301                         statValue, ok := thisSample.memStat["total_"+statName]
302                         if !ok {
303                                 statValue, ok = thisSample.memStat[statName]
304                                 if !ok {
305                                         continue
306                                 }
307                         }
308                         var index int
309                         var statThreshold Threshold
310                         for index, statThreshold = range thresholds {
311                                 if statValue < statThreshold.threshold {
312                                         break
313                                 } else if statThreshold.percentage > 0 {
314                                         r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)",
315                                                 statThreshold.percentage, statName, statValue, statThreshold.total)
316                                 } else {
317                                         r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)",
318                                                 statThreshold.threshold, statName, statValue)
319                                 }
320                         }
321                         r.MemThresholds[statName] = thresholds[index:]
322                 }
323         }
324 }
325
326 func (r *Reporter) reportMemSample() {
327         var outstat bytes.Buffer
328         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
329         for _, key := range wantStats {
330                 // Use "total_X" stats (entire hierarchy) if enabled,
331                 // otherwise just the single cgroup -- see
332                 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
333                 if val, ok := r.lastMemSample.memStat["total_"+key]; ok {
334                         fmt.Fprintf(&outstat, " %d %s", val, key)
335                 } else if val, ok := r.lastMemSample.memStat[key]; ok {
336                         fmt.Fprintf(&outstat, " %d %s", val, key)
337                 }
338         }
339         r.Logger.Printf("mem%s\n", outstat.String())
340 }
341
342 func (r *Reporter) doProcmemStats() {
343         if r.kernelPageSize == 0 {
344                 // assign "don't try again" value in case we give up
345                 // and return without assigning the real value
346                 r.kernelPageSize = -1
347                 buf, err := os.ReadFile("/proc/self/smaps")
348                 if err != nil {
349                         r.Logger.Printf("error reading /proc/self/smaps: %s", err)
350                         return
351                 }
352                 m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
353                 if len(m) != 2 {
354                         r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
355                         return
356                 }
357                 size, err := strconv.ParseInt(string(m[1]), 10, 64)
358                 if err != nil {
359                         r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
360                         return
361                 }
362                 r.kernelPageSize = size * 1024
363         } else if r.kernelPageSize < 0 {
364                 // already failed to determine page size, don't keep
365                 // trying/logging
366                 return
367         }
368
369         r.reportPIDsMu.Lock()
370         defer r.reportPIDsMu.Unlock()
371         procnames := make([]string, 0, len(r.reportPIDs))
372         for name := range r.reportPIDs {
373                 procnames = append(procnames, name)
374         }
375         sort.Strings(procnames)
376         procmem := ""
377         for _, procname := range procnames {
378                 pid := r.reportPIDs[procname]
379                 buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
380                 if err != nil {
381                         continue
382                 }
383                 // If the executable name contains a ')' char,
384                 // /proc/$pid/stat will look like '1234 (exec name)) S
385                 // 123 ...' -- the last ')' is the end of the 2nd
386                 // field.
387                 paren := bytes.LastIndexByte(buf, ')')
388                 if paren < 0 {
389                         continue
390                 }
391                 fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
392                 if len(fields) < 24 {
393                         continue
394                 }
395                 // rss is the 24th field in .../stat, and fields[0]
396                 // here is the last char ')' of the 2nd field, so
397                 // rss is fields[22]
398                 rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
399                 if err != nil {
400                         continue
401                 }
402                 procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
403         }
404         if procmem != "" {
405                 r.Logger.Printf("procmem%s\n", procmem)
406         }
407 }
408
409 func (r *Reporter) doNetworkStats() {
410         sampleTime := time.Now()
411         stats, err := r.getContainerNetStats()
412         if err != nil {
413                 return
414         }
415
416         scanner := bufio.NewScanner(stats)
417         for scanner.Scan() {
418                 var ifName string
419                 var rx, tx int64
420                 words := strings.Fields(scanner.Text())
421                 if len(words) != 17 {
422                         // Skip lines with wrong format
423                         continue
424                 }
425                 ifName = strings.TrimRight(words[0], ":")
426                 if ifName == "lo" || ifName == "" {
427                         // Skip loopback interface and lines with wrong format
428                         continue
429                 }
430                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
431                         continue
432                 }
433                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
434                         continue
435                 }
436                 nextSample := ioSample{}
437                 nextSample.sampleTime = sampleTime
438                 nextSample.txBytes = tx
439                 nextSample.rxBytes = rx
440                 var delta string
441                 if prev, ok := r.lastNetSample[ifName]; ok {
442                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
443                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
444                                 interval,
445                                 tx-prev.txBytes,
446                                 rx-prev.rxBytes)
447                 }
448                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
449                 r.lastNetSample[ifName] = nextSample
450         }
451 }
452
453 type diskSpaceSample struct {
454         hasData    bool
455         sampleTime time.Time
456         total      uint64
457         used       uint64
458         available  uint64
459 }
460
461 func (r *Reporter) doDiskSpaceStats() {
462         s := syscall.Statfs_t{}
463         err := syscall.Statfs(r.TempDir, &s)
464         if err != nil {
465                 return
466         }
467         bs := uint64(s.Bsize)
468         nextSample := diskSpaceSample{
469                 hasData:    true,
470                 sampleTime: time.Now(),
471                 total:      s.Blocks * bs,
472                 used:       (s.Blocks - s.Bfree) * bs,
473                 available:  s.Bavail * bs,
474         }
475
476         var delta string
477         if r.lastDiskSpaceSample.hasData {
478                 prev := r.lastDiskSpaceSample
479                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
480                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
481                         interval,
482                         int64(nextSample.used-prev.used))
483         }
484         r.Logger.Printf("statfs %d available %d used %d total%s\n",
485                 nextSample.available, nextSample.used, nextSample.total, delta)
486         r.lastDiskSpaceSample = nextSample
487 }
488
489 type cpuSample struct {
490         hasData    bool // to distinguish the zero value from real data
491         sampleTime time.Time
492         user       float64
493         sys        float64
494         cpus       int64
495 }
496
497 // Return the number of CPUs available in the container. Return 0 if
498 // we can't figure out the real number of CPUs.
499 func (r *Reporter) getCPUCount() int64 {
500         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
501         if err != nil {
502                 return 0
503         }
504         defer cpusetFile.Close()
505         b, err := r.readAllOrWarn(cpusetFile)
506         if err != nil {
507                 return 0
508         }
509         sp := strings.Split(string(b), ",")
510         cpus := int64(0)
511         for _, v := range sp {
512                 var min, max int64
513                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
514                 if n == 2 {
515                         cpus += (max - min) + 1
516                 } else {
517                         cpus++
518                 }
519         }
520         return cpus
521 }
522
523 func (r *Reporter) doCPUStats() {
524         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
525         if err != nil {
526                 return
527         }
528         defer statFile.Close()
529         b, err := r.readAllOrWarn(statFile)
530         if err != nil {
531                 return
532         }
533
534         var userTicks, sysTicks int64
535         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
536         userHz := float64(100)
537         nextSample := cpuSample{
538                 hasData:    true,
539                 sampleTime: time.Now(),
540                 user:       float64(userTicks) / userHz,
541                 sys:        float64(sysTicks) / userHz,
542                 cpus:       r.getCPUCount(),
543         }
544
545         delta := ""
546         if r.lastCPUSample.hasData {
547                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
548                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
549                         nextSample.user-r.lastCPUSample.user,
550                         nextSample.sys-r.lastCPUSample.sys)
551         }
552         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
553                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
554         r.lastCPUSample = nextSample
555 }
556
557 func (r *Reporter) doAllStats() {
558         r.reportMemSample()
559         r.doProcmemStats()
560         r.doCPUStats()
561         r.doBlkIOStats()
562         r.doNetworkStats()
563         r.doDiskSpaceStats()
564 }
565
566 // Report stats periodically until we learn (via r.done) that someone
567 // called Stop.
568 func (r *Reporter) run() {
569         defer close(r.flushed)
570
571         r.reportedStatFile = make(map[string]string)
572
573         if !r.waitForCIDFile() || !r.waitForCgroup() {
574                 return
575         }
576
577         r.lastNetSample = make(map[string]ioSample)
578         r.lastDiskIOSample = make(map[string]ioSample)
579
580         if len(r.TempDir) == 0 {
581                 // Temporary dir not provided, try to get it from the environment.
582                 r.TempDir = os.Getenv("TMPDIR")
583         }
584         if len(r.TempDir) > 0 {
585                 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
586         }
587
588         r.getMemSample()
589         r.doAllStats()
590
591         memTicker := time.NewTicker(time.Second)
592         mainTicker := time.NewTicker(r.PollPeriod)
593         for {
594                 select {
595                 case <-r.done:
596                         return
597                 case <-memTicker.C:
598                         r.getMemSample()
599                 case <-mainTicker.C:
600                         r.doAllStats()
601                 }
602         }
603 }
604
605 // If CID is empty, wait for it to appear in CIDFile. Return true if
606 // we get it before we learn (via r.done) that someone called Stop.
607 func (r *Reporter) waitForCIDFile() bool {
608         if r.CID != "" || r.CIDFile == "" {
609                 return true
610         }
611
612         ticker := time.NewTicker(100 * time.Millisecond)
613         defer ticker.Stop()
614         for {
615                 cid, err := ioutil.ReadFile(r.CIDFile)
616                 if err == nil && len(cid) > 0 {
617                         r.CID = string(cid)
618                         return true
619                 }
620                 select {
621                 case <-ticker.C:
622                 case <-r.done:
623                         r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
624                         return false
625                 }
626         }
627 }
628
629 // Wait for the cgroup stats files to appear in cgroup_root. Return
630 // true if they appear before r.done indicates someone called Stop. If
631 // they don't appear within one poll interval, log a warning and keep
632 // waiting.
633 func (r *Reporter) waitForCgroup() bool {
634         ticker := time.NewTicker(100 * time.Millisecond)
635         defer ticker.Stop()
636         warningTimer := time.After(r.PollPeriod)
637         for {
638                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
639                 if err == nil {
640                         c.Close()
641                         return true
642                 }
643                 select {
644                 case <-ticker.C:
645                 case <-warningTimer:
646                         r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
647                 case <-r.done:
648                         r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
649                         return false
650                 }
651         }
652 }