443d2202cecb5420ad174ae56ca41009cd7bea1b
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "os"
17         "sort"
18         "strconv"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23 )
24
25 // A Reporter gathers statistics for a cgroup and writes them to a
26 // log.Logger.
27 type Reporter struct {
28         // CID of the container to monitor. If empty, read the CID
29         // from CIDFile (first waiting until a non-empty file appears
30         // at CIDFile). If CIDFile is also empty, report host
31         // statistics.
32         CID string
33
34         // Path to a file we can read CID from.
35         CIDFile string
36
37         // Where cgroup accounting files live on this system, e.g.,
38         // "/sys/fs/cgroup".
39         CgroupRoot string
40
41         // Parent cgroup, e.g., "docker".
42         CgroupParent string
43
44         // Interval between samples. Must be positive.
45         PollPeriod time.Duration
46
47         // Temporary directory, will be monitored for available, used & total space.
48         TempDir string
49
50         // Where to write statistics. Must not be nil.
51         Logger interface {
52                 Printf(fmt string, args ...interface{})
53         }
54
55         reportedStatFile    map[string]string
56         lastNetSample       map[string]ioSample
57         lastDiskIOSample    map[string]ioSample
58         lastCPUSample       cpuSample
59         lastDiskSpaceSample diskSpaceSample
60
61         reportPIDs   map[string]int
62         reportPIDsMu sync.Mutex
63
64         done    chan struct{} // closed when we should stop reporting
65         flushed chan struct{} // closed when we have made our last report
66 }
67
68 // Start starts monitoring in a new goroutine, and returns
69 // immediately.
70 //
71 // The monitoring goroutine waits for a non-empty CIDFile to appear
72 // (unless CID is non-empty). Then it waits for the accounting files
73 // to appear for the monitored container. Then it collects and reports
74 // statistics until Stop is called.
75 //
76 // Callers should not call Start more than once.
77 //
78 // Callers should not modify public data fields after calling Start.
79 func (r *Reporter) Start() {
80         r.done = make(chan struct{})
81         r.flushed = make(chan struct{})
82         go r.run()
83 }
84
85 // ReportPID starts reporting stats for a specified process.
86 func (r *Reporter) ReportPID(name string, pid int) {
87         r.reportPIDsMu.Lock()
88         defer r.reportPIDsMu.Unlock()
89         if r.reportPIDs == nil {
90                 r.reportPIDs = map[string]int{name: pid}
91         } else {
92                 r.reportPIDs[name] = pid
93         }
94 }
95
96 // Stop reporting. Do not call more than once, or before calling
97 // Start.
98 //
99 // Nothing will be logged after Stop returns.
100 func (r *Reporter) Stop() {
101         close(r.done)
102         <-r.flushed
103 }
104
105 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
106         content, err := ioutil.ReadAll(in)
107         if err != nil {
108                 r.Logger.Printf("warning: %v", err)
109         }
110         return content, err
111 }
112
113 // Open the cgroup stats file in /sys/fs corresponding to the target
114 // cgroup, and return an io.ReadCloser. If no stats file is available,
115 // return nil.
116 //
117 // Log the file that was opened, if it isn't the same file opened on
118 // the last openStatFile for this stat.
119 //
120 // Log "not available" if no file is found and either this stat has
121 // been available in the past, or verbose==true.
122 //
123 // TODO: Instead of trying all options, choose a process in the
124 // container, and read /proc/PID/cgroup to determine the appropriate
125 // cgroup root for the given statgroup. (This will avoid falling back
126 // to host-level stats during container setup and teardown.)
127 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
128         var paths []string
129         if r.CID != "" {
130                 // Collect container's stats
131                 paths = []string{
132                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
133                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
134                 }
135         } else {
136                 // Collect this host's stats
137                 paths = []string{
138                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
139                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
140                 }
141         }
142         var path string
143         var file *os.File
144         var err error
145         for _, path = range paths {
146                 file, err = os.Open(path)
147                 if err == nil {
148                         break
149                 } else {
150                         path = ""
151                 }
152         }
153         if pathWas := r.reportedStatFile[stat]; pathWas != path {
154                 // Log whenever we start using a new/different cgroup
155                 // stat file for a given statistic. This typically
156                 // happens 1 to 3 times per statistic, depending on
157                 // whether we happen to collect stats [a] before any
158                 // processes have been created in the container and
159                 // [b] after all contained processes have exited.
160                 if path == "" && verbose {
161                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
162                 } else if pathWas != "" {
163                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
164                 } else {
165                         r.Logger.Printf("notice: reading stats from %s\n", path)
166                 }
167                 r.reportedStatFile[stat] = path
168         }
169         return file, err
170 }
171
172 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
173         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
174         if err != nil {
175                 return nil, err
176         }
177         defer procsFile.Close()
178         reader := bufio.NewScanner(procsFile)
179         for reader.Scan() {
180                 taskPid := reader.Text()
181                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
182                 stats, err := ioutil.ReadFile(statsFilename)
183                 if err != nil {
184                         r.Logger.Printf("notice: %v", err)
185                         continue
186                 }
187                 return strings.NewReader(string(stats)), nil
188         }
189         return nil, errors.New("Could not read stats for any proc in container")
190 }
191
192 type ioSample struct {
193         sampleTime time.Time
194         txBytes    int64
195         rxBytes    int64
196 }
197
198 func (r *Reporter) doBlkIOStats() {
199         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
200         if err != nil {
201                 return
202         }
203         defer c.Close()
204         b := bufio.NewScanner(c)
205         var sampleTime = time.Now()
206         newSamples := make(map[string]ioSample)
207         for b.Scan() {
208                 var device, op string
209                 var val int64
210                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
211                         continue
212                 }
213                 var thisSample ioSample
214                 var ok bool
215                 if thisSample, ok = newSamples[device]; !ok {
216                         thisSample = ioSample{sampleTime, -1, -1}
217                 }
218                 switch op {
219                 case "Read":
220                         thisSample.rxBytes = val
221                 case "Write":
222                         thisSample.txBytes = val
223                 }
224                 newSamples[device] = thisSample
225         }
226         for dev, sample := range newSamples {
227                 if sample.txBytes < 0 || sample.rxBytes < 0 {
228                         continue
229                 }
230                 delta := ""
231                 if prev, ok := r.lastDiskIOSample[dev]; ok {
232                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
233                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
234                                 sample.txBytes-prev.txBytes,
235                                 sample.rxBytes-prev.rxBytes)
236                 }
237                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
238                 r.lastDiskIOSample[dev] = sample
239         }
240 }
241
242 type memSample struct {
243         sampleTime time.Time
244         memStat    map[string]int64
245 }
246
247 func (r *Reporter) doMemoryStats() {
248         c, err := r.openStatFile("memory", "memory.stat", true)
249         if err != nil {
250                 return
251         }
252         defer c.Close()
253         b := bufio.NewScanner(c)
254         thisSample := memSample{time.Now(), make(map[string]int64)}
255         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
256         for b.Scan() {
257                 var stat string
258                 var val int64
259                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
260                         continue
261                 }
262                 thisSample.memStat[stat] = val
263         }
264         var outstat bytes.Buffer
265         for _, key := range wantStats {
266                 // Use "total_X" stats (entire hierarchy) if enabled,
267                 // otherwise just the single cgroup -- see
268                 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
269                 if val, ok := thisSample.memStat["total_"+key]; ok {
270                         fmt.Fprintf(&outstat, " %d %s", val, key)
271                 } else if val, ok := thisSample.memStat[key]; ok {
272                         fmt.Fprintf(&outstat, " %d %s", val, key)
273                 }
274         }
275         r.Logger.Printf("mem%s\n", outstat.String())
276
277         r.reportPIDsMu.Lock()
278         defer r.reportPIDsMu.Unlock()
279         procnames := make([]string, 0, len(r.reportPIDs))
280         for name := range r.reportPIDs {
281                 procnames = append(procnames, name)
282         }
283         sort.Strings(procnames)
284         procmem := ""
285         for _, procname := range procnames {
286                 pid := r.reportPIDs[procname]
287                 buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
288                 if err != nil {
289                         continue
290                 }
291                 // If the executable name contains a ')' char,
292                 // /proc/$pid/stat will look like '1234 (exec name)) S
293                 // 123 ...' -- the last ')' is the end of the 2nd
294                 // field.
295                 paren := bytes.LastIndexByte(buf, ')')
296                 if paren < 0 {
297                         continue
298                 }
299                 fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
300                 if len(fields) < 24 {
301                         continue
302                 }
303                 // rss is the 24th field in .../stat, and fields[0]
304                 // here is the last char ')' of the 2nd field, so
305                 // rss is fields[22]
306                 rss, err := strconv.Atoi(string(fields[22]))
307                 if err != nil {
308                         continue
309                 }
310                 procmem += fmt.Sprintf(" %d %s", rss, procname)
311         }
312         if procmem != "" {
313                 r.Logger.Printf("procmem%s\n", procmem)
314         }
315 }
316
317 func (r *Reporter) doNetworkStats() {
318         sampleTime := time.Now()
319         stats, err := r.getContainerNetStats()
320         if err != nil {
321                 return
322         }
323
324         scanner := bufio.NewScanner(stats)
325         for scanner.Scan() {
326                 var ifName string
327                 var rx, tx int64
328                 words := strings.Fields(scanner.Text())
329                 if len(words) != 17 {
330                         // Skip lines with wrong format
331                         continue
332                 }
333                 ifName = strings.TrimRight(words[0], ":")
334                 if ifName == "lo" || ifName == "" {
335                         // Skip loopback interface and lines with wrong format
336                         continue
337                 }
338                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
339                         continue
340                 }
341                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
342                         continue
343                 }
344                 nextSample := ioSample{}
345                 nextSample.sampleTime = sampleTime
346                 nextSample.txBytes = tx
347                 nextSample.rxBytes = rx
348                 var delta string
349                 if prev, ok := r.lastNetSample[ifName]; ok {
350                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
351                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
352                                 interval,
353                                 tx-prev.txBytes,
354                                 rx-prev.rxBytes)
355                 }
356                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
357                 r.lastNetSample[ifName] = nextSample
358         }
359 }
360
361 type diskSpaceSample struct {
362         hasData    bool
363         sampleTime time.Time
364         total      uint64
365         used       uint64
366         available  uint64
367 }
368
369 func (r *Reporter) doDiskSpaceStats() {
370         s := syscall.Statfs_t{}
371         err := syscall.Statfs(r.TempDir, &s)
372         if err != nil {
373                 return
374         }
375         bs := uint64(s.Bsize)
376         nextSample := diskSpaceSample{
377                 hasData:    true,
378                 sampleTime: time.Now(),
379                 total:      s.Blocks * bs,
380                 used:       (s.Blocks - s.Bfree) * bs,
381                 available:  s.Bavail * bs,
382         }
383
384         var delta string
385         if r.lastDiskSpaceSample.hasData {
386                 prev := r.lastDiskSpaceSample
387                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
388                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
389                         interval,
390                         int64(nextSample.used-prev.used))
391         }
392         r.Logger.Printf("statfs %d available %d used %d total%s\n",
393                 nextSample.available, nextSample.used, nextSample.total, delta)
394         r.lastDiskSpaceSample = nextSample
395 }
396
397 type cpuSample struct {
398         hasData    bool // to distinguish the zero value from real data
399         sampleTime time.Time
400         user       float64
401         sys        float64
402         cpus       int64
403 }
404
405 // Return the number of CPUs available in the container. Return 0 if
406 // we can't figure out the real number of CPUs.
407 func (r *Reporter) getCPUCount() int64 {
408         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
409         if err != nil {
410                 return 0
411         }
412         defer cpusetFile.Close()
413         b, err := r.readAllOrWarn(cpusetFile)
414         if err != nil {
415                 return 0
416         }
417         sp := strings.Split(string(b), ",")
418         cpus := int64(0)
419         for _, v := range sp {
420                 var min, max int64
421                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
422                 if n == 2 {
423                         cpus += (max - min) + 1
424                 } else {
425                         cpus++
426                 }
427         }
428         return cpus
429 }
430
431 func (r *Reporter) doCPUStats() {
432         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
433         if err != nil {
434                 return
435         }
436         defer statFile.Close()
437         b, err := r.readAllOrWarn(statFile)
438         if err != nil {
439                 return
440         }
441
442         var userTicks, sysTicks int64
443         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
444         userHz := float64(100)
445         nextSample := cpuSample{
446                 hasData:    true,
447                 sampleTime: time.Now(),
448                 user:       float64(userTicks) / userHz,
449                 sys:        float64(sysTicks) / userHz,
450                 cpus:       r.getCPUCount(),
451         }
452
453         delta := ""
454         if r.lastCPUSample.hasData {
455                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
456                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
457                         nextSample.user-r.lastCPUSample.user,
458                         nextSample.sys-r.lastCPUSample.sys)
459         }
460         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
461                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
462         r.lastCPUSample = nextSample
463 }
464
465 // Report stats periodically until we learn (via r.done) that someone
466 // called Stop.
467 func (r *Reporter) run() {
468         defer close(r.flushed)
469
470         r.reportedStatFile = make(map[string]string)
471
472         if !r.waitForCIDFile() || !r.waitForCgroup() {
473                 return
474         }
475
476         r.lastNetSample = make(map[string]ioSample)
477         r.lastDiskIOSample = make(map[string]ioSample)
478
479         if len(r.TempDir) == 0 {
480                 // Temporary dir not provided, try to get it from the environment.
481                 r.TempDir = os.Getenv("TMPDIR")
482         }
483         if len(r.TempDir) > 0 {
484                 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
485         }
486
487         ticker := time.NewTicker(r.PollPeriod)
488         for {
489                 r.doMemoryStats()
490                 r.doCPUStats()
491                 r.doBlkIOStats()
492                 r.doNetworkStats()
493                 r.doDiskSpaceStats()
494                 select {
495                 case <-r.done:
496                         return
497                 case <-ticker.C:
498                 }
499         }
500 }
501
502 // If CID is empty, wait for it to appear in CIDFile. Return true if
503 // we get it before we learn (via r.done) that someone called Stop.
504 func (r *Reporter) waitForCIDFile() bool {
505         if r.CID != "" || r.CIDFile == "" {
506                 return true
507         }
508
509         ticker := time.NewTicker(100 * time.Millisecond)
510         defer ticker.Stop()
511         for {
512                 cid, err := ioutil.ReadFile(r.CIDFile)
513                 if err == nil && len(cid) > 0 {
514                         r.CID = string(cid)
515                         return true
516                 }
517                 select {
518                 case <-ticker.C:
519                 case <-r.done:
520                         r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
521                         return false
522                 }
523         }
524 }
525
526 // Wait for the cgroup stats files to appear in cgroup_root. Return
527 // true if they appear before r.done indicates someone called Stop. If
528 // they don't appear within one poll interval, log a warning and keep
529 // waiting.
530 func (r *Reporter) waitForCgroup() bool {
531         ticker := time.NewTicker(100 * time.Millisecond)
532         defer ticker.Stop()
533         warningTimer := time.After(r.PollPeriod)
534         for {
535                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
536                 if err == nil {
537                         c.Close()
538                         return true
539                 }
540                 select {
541                 case <-ticker.C:
542                 case <-warningTimer:
543                         r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
544                 case <-r.done:
545                         r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
546                         return false
547                 }
548         }
549 }