18805: replace unnecessary lookup of a constant.
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // A Reporter gathers statistics for a cgroup and writes them to a
25 // log.Logger.
26 type Reporter struct {
27         // CID of the container to monitor. If empty, read the CID
28         // from CIDFile (first waiting until a non-empty file appears
29         // at CIDFile). If CIDFile is also empty, report host
30         // statistics.
31         CID string
32
33         // Path to a file we can read CID from.
34         CIDFile string
35
36         // Where cgroup accounting files live on this system, e.g.,
37         // "/sys/fs/cgroup".
38         CgroupRoot string
39
40         // Parent cgroup, e.g., "docker".
41         CgroupParent string
42
43         // Interval between samples. Must be positive.
44         PollPeriod time.Duration
45
46         // Temporary directory, will be monitored for available, used & total space.
47         TempDir string
48
49         // Where to write statistics. Must not be nil.
50         Logger *log.Logger
51
52         reportedStatFile    map[string]string
53         lastNetSample       map[string]ioSample
54         lastDiskIOSample    map[string]ioSample
55         lastCPUSample       cpuSample
56         lastDiskSpaceSample diskSpaceSample
57
58         done    chan struct{} // closed when we should stop reporting
59         flushed chan struct{} // closed when we have made our last report
60 }
61
62 // Start starts monitoring in a new goroutine, and returns
63 // immediately.
64 //
65 // The monitoring goroutine waits for a non-empty CIDFile to appear
66 // (unless CID is non-empty). Then it waits for the accounting files
67 // to appear for the monitored container. Then it collects and reports
68 // statistics until Stop is called.
69 //
70 // Callers should not call Start more than once.
71 //
72 // Callers should not modify public data fields after calling Start.
73 func (r *Reporter) Start() {
74         r.done = make(chan struct{})
75         r.flushed = make(chan struct{})
76         go r.run()
77 }
78
79 // Stop reporting. Do not call more than once, or before calling
80 // Start.
81 //
82 // Nothing will be logged after Stop returns.
83 func (r *Reporter) Stop() {
84         close(r.done)
85         <-r.flushed
86 }
87
88 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
89         content, err := ioutil.ReadAll(in)
90         if err != nil {
91                 r.Logger.Printf("warning: %v", err)
92         }
93         return content, err
94 }
95
96 // Open the cgroup stats file in /sys/fs corresponding to the target
97 // cgroup, and return an io.ReadCloser. If no stats file is available,
98 // return nil.
99 //
100 // Log the file that was opened, if it isn't the same file opened on
101 // the last openStatFile for this stat.
102 //
103 // Log "not available" if no file is found and either this stat has
104 // been available in the past, or verbose==true.
105 //
106 // TODO: Instead of trying all options, choose a process in the
107 // container, and read /proc/PID/cgroup to determine the appropriate
108 // cgroup root for the given statgroup. (This will avoid falling back
109 // to host-level stats during container setup and teardown.)
110 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
111         var paths []string
112         if r.CID != "" {
113                 // Collect container's stats
114                 paths = []string{
115                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
116                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
117                 }
118         } else {
119                 // Collect this host's stats
120                 paths = []string{
121                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
122                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
123                 }
124         }
125         var path string
126         var file *os.File
127         var err error
128         for _, path = range paths {
129                 file, err = os.Open(path)
130                 if err == nil {
131                         break
132                 } else {
133                         path = ""
134                 }
135         }
136         if pathWas := r.reportedStatFile[stat]; pathWas != path {
137                 // Log whenever we start using a new/different cgroup
138                 // stat file for a given statistic. This typically
139                 // happens 1 to 3 times per statistic, depending on
140                 // whether we happen to collect stats [a] before any
141                 // processes have been created in the container and
142                 // [b] after all contained processes have exited.
143                 if path == "" && verbose {
144                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
145                 } else if pathWas != "" {
146                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
147                 } else {
148                         r.Logger.Printf("notice: reading stats from %s\n", path)
149                 }
150                 r.reportedStatFile[stat] = path
151         }
152         return file, err
153 }
154
155 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
156         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
157         if err != nil {
158                 return nil, err
159         }
160         defer procsFile.Close()
161         reader := bufio.NewScanner(procsFile)
162         for reader.Scan() {
163                 taskPid := reader.Text()
164                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
165                 stats, err := ioutil.ReadFile(statsFilename)
166                 if err != nil {
167                         r.Logger.Printf("notice: %v", err)
168                         continue
169                 }
170                 return strings.NewReader(string(stats)), nil
171         }
172         return nil, errors.New("Could not read stats for any proc in container")
173 }
174
175 type ioSample struct {
176         sampleTime time.Time
177         txBytes    int64
178         rxBytes    int64
179 }
180
181 func (r *Reporter) doBlkIOStats() {
182         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
183         if err != nil {
184                 return
185         }
186         defer c.Close()
187         b := bufio.NewScanner(c)
188         var sampleTime = time.Now()
189         newSamples := make(map[string]ioSample)
190         for b.Scan() {
191                 var device, op string
192                 var val int64
193                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
194                         continue
195                 }
196                 var thisSample ioSample
197                 var ok bool
198                 if thisSample, ok = newSamples[device]; !ok {
199                         thisSample = ioSample{sampleTime, -1, -1}
200                 }
201                 switch op {
202                 case "Read":
203                         thisSample.rxBytes = val
204                 case "Write":
205                         thisSample.txBytes = val
206                 }
207                 newSamples[device] = thisSample
208         }
209         for dev, sample := range newSamples {
210                 if sample.txBytes < 0 || sample.rxBytes < 0 {
211                         continue
212                 }
213                 delta := ""
214                 if prev, ok := r.lastDiskIOSample[dev]; ok {
215                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
216                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
217                                 sample.txBytes-prev.txBytes,
218                                 sample.rxBytes-prev.rxBytes)
219                 }
220                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
221                 r.lastDiskIOSample[dev] = sample
222         }
223 }
224
225 type memSample struct {
226         sampleTime time.Time
227         memStat    map[string]int64
228 }
229
230 func (r *Reporter) doMemoryStats() {
231         c, err := r.openStatFile("memory", "memory.stat", true)
232         if err != nil {
233                 return
234         }
235         defer c.Close()
236         b := bufio.NewScanner(c)
237         thisSample := memSample{time.Now(), make(map[string]int64)}
238         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
239         for b.Scan() {
240                 var stat string
241                 var val int64
242                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
243                         continue
244                 }
245                 thisSample.memStat[stat] = val
246         }
247         var outstat bytes.Buffer
248         for _, key := range wantStats {
249                 // Use "total_X" stats (entire hierarchy) if enabled,
250                 // otherwise just the single cgroup -- see
251                 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
252                 if val, ok := thisSample.memStat["total_"+key]; ok {
253                         fmt.Fprintf(&outstat, " %d %s", val, key)
254                 } else if val, ok := thisSample.memStat[key]; ok {
255                         fmt.Fprintf(&outstat, " %d %s", val, key)
256                 }
257         }
258         r.Logger.Printf("mem%s\n", outstat.String())
259 }
260
261 func (r *Reporter) doNetworkStats() {
262         sampleTime := time.Now()
263         stats, err := r.getContainerNetStats()
264         if err != nil {
265                 return
266         }
267
268         scanner := bufio.NewScanner(stats)
269         for scanner.Scan() {
270                 var ifName string
271                 var rx, tx int64
272                 words := strings.Fields(scanner.Text())
273                 if len(words) != 17 {
274                         // Skip lines with wrong format
275                         continue
276                 }
277                 ifName = strings.TrimRight(words[0], ":")
278                 if ifName == "lo" || ifName == "" {
279                         // Skip loopback interface and lines with wrong format
280                         continue
281                 }
282                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
283                         continue
284                 }
285                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
286                         continue
287                 }
288                 nextSample := ioSample{}
289                 nextSample.sampleTime = sampleTime
290                 nextSample.txBytes = tx
291                 nextSample.rxBytes = rx
292                 var delta string
293                 if prev, ok := r.lastNetSample[ifName]; ok {
294                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
295                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
296                                 interval,
297                                 tx-prev.txBytes,
298                                 rx-prev.rxBytes)
299                 }
300                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
301                 r.lastNetSample[ifName] = nextSample
302         }
303 }
304
305 type diskSpaceSample struct {
306         hasData    bool
307         sampleTime time.Time
308         total      uint64
309         used       uint64
310         available  uint64
311 }
312
313 func (r *Reporter) doDiskSpaceStats() {
314         s := syscall.Statfs_t{}
315         err := syscall.Statfs(r.TempDir, &s)
316         if err != nil {
317                 return
318         }
319         bs := uint64(s.Bsize)
320         nextSample := diskSpaceSample{
321                 hasData:    true,
322                 sampleTime: time.Now(),
323                 total:      s.Blocks * bs,
324                 used:       (s.Blocks - s.Bfree) * bs,
325                 available:  s.Bavail * bs,
326         }
327
328         var delta string
329         if r.lastDiskSpaceSample.hasData {
330                 prev := r.lastDiskSpaceSample
331                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
332                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
333                         interval,
334                         int64(nextSample.used-prev.used))
335         }
336         r.Logger.Printf("statfs %d available %d used %d total%s\n",
337                 nextSample.available, nextSample.used, nextSample.total, delta)
338         r.lastDiskSpaceSample = nextSample
339 }
340
341 type cpuSample struct {
342         hasData    bool // to distinguish the zero value from real data
343         sampleTime time.Time
344         user       float64
345         sys        float64
346         cpus       int64
347 }
348
349 // Return the number of CPUs available in the container. Return 0 if
350 // we can't figure out the real number of CPUs.
351 func (r *Reporter) getCPUCount() int64 {
352         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
353         if err != nil {
354                 return 0
355         }
356         defer cpusetFile.Close()
357         b, err := r.readAllOrWarn(cpusetFile)
358         if err != nil {
359                 return 0
360         }
361         sp := strings.Split(string(b), ",")
362         cpus := int64(0)
363         for _, v := range sp {
364                 var min, max int64
365                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
366                 if n == 2 {
367                         cpus += (max - min) + 1
368                 } else {
369                         cpus++
370                 }
371         }
372         return cpus
373 }
374
375 func (r *Reporter) doCPUStats() {
376         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
377         if err != nil {
378                 return
379         }
380         defer statFile.Close()
381         b, err := r.readAllOrWarn(statFile)
382         if err != nil {
383                 return
384         }
385
386         var userTicks, sysTicks int64
387         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
388         userHz := float64(100)
389         nextSample := cpuSample{
390                 hasData:    true,
391                 sampleTime: time.Now(),
392                 user:       float64(userTicks) / userHz,
393                 sys:        float64(sysTicks) / userHz,
394                 cpus:       r.getCPUCount(),
395         }
396
397         delta := ""
398         if r.lastCPUSample.hasData {
399                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
400                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
401                         nextSample.user-r.lastCPUSample.user,
402                         nextSample.sys-r.lastCPUSample.sys)
403         }
404         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
405                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
406         r.lastCPUSample = nextSample
407 }
408
409 // Report stats periodically until we learn (via r.done) that someone
410 // called Stop.
411 func (r *Reporter) run() {
412         defer close(r.flushed)
413
414         r.reportedStatFile = make(map[string]string)
415
416         if !r.waitForCIDFile() || !r.waitForCgroup() {
417                 return
418         }
419
420         r.lastNetSample = make(map[string]ioSample)
421         r.lastDiskIOSample = make(map[string]ioSample)
422
423         if len(r.TempDir) == 0 {
424                 // Temporary dir not provided, try to get it from the environment.
425                 r.TempDir = os.Getenv("TMPDIR")
426         }
427         if len(r.TempDir) > 0 {
428                 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
429         }
430
431         ticker := time.NewTicker(r.PollPeriod)
432         for {
433                 r.doMemoryStats()
434                 r.doCPUStats()
435                 r.doBlkIOStats()
436                 r.doNetworkStats()
437                 r.doDiskSpaceStats()
438                 select {
439                 case <-r.done:
440                         return
441                 case <-ticker.C:
442                 }
443         }
444 }
445
446 // If CID is empty, wait for it to appear in CIDFile. Return true if
447 // we get it before we learn (via r.done) that someone called Stop.
448 func (r *Reporter) waitForCIDFile() bool {
449         if r.CID != "" || r.CIDFile == "" {
450                 return true
451         }
452
453         ticker := time.NewTicker(100 * time.Millisecond)
454         defer ticker.Stop()
455         for {
456                 cid, err := ioutil.ReadFile(r.CIDFile)
457                 if err == nil && len(cid) > 0 {
458                         r.CID = string(cid)
459                         return true
460                 }
461                 select {
462                 case <-ticker.C:
463                 case <-r.done:
464                         r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
465                         return false
466                 }
467         }
468 }
469
470 // Wait for the cgroup stats files to appear in cgroup_root. Return
471 // true if they appear before r.done indicates someone called Stop. If
472 // they don't appear within one poll interval, log a warning and keep
473 // waiting.
474 func (r *Reporter) waitForCgroup() bool {
475         ticker := time.NewTicker(100 * time.Millisecond)
476         defer ticker.Stop()
477         warningTimer := time.After(r.PollPeriod)
478         for {
479                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
480                 if err == nil {
481                         c.Close()
482                         return true
483                 }
484                 select {
485                 case <-ticker.C:
486                 case <-warningTimer:
487                         r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
488                 case <-r.done:
489                         r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
490                         return false
491                 }
492         }
493 }