Merge branch '14670-new-java-sdk-docs' of git.curoverse.com:arvados into 14670-new...
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // This magically allows us to look up userHz via _SC_CLK_TCK:
25
26 /*
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <pwd.h>
30 #include <stdlib.h>
31 */
32 import "C"
33
34 // A Reporter gathers statistics for a cgroup and writes them to a
35 // log.Logger.
36 type Reporter struct {
37         // CID of the container to monitor. If empty, read the CID
38         // from CIDFile (first waiting until a non-empty file appears
39         // at CIDFile). If CIDFile is also empty, report host
40         // statistics.
41         CID string
42
43         // Path to a file we can read CID from.
44         CIDFile string
45
46         // Where cgroup accounting files live on this system, e.g.,
47         // "/sys/fs/cgroup".
48         CgroupRoot string
49
50         // Parent cgroup, e.g., "docker".
51         CgroupParent string
52
53         // Interval between samples. Must be positive.
54         PollPeriod time.Duration
55
56         // Temporary directory, will be monitored for available, used & total space.
57         TempDir string
58
59         // Where to write statistics. Must not be nil.
60         Logger *log.Logger
61
62         reportedStatFile    map[string]string
63         lastNetSample       map[string]ioSample
64         lastDiskIOSample    map[string]ioSample
65         lastCPUSample       cpuSample
66         lastDiskSpaceSample diskSpaceSample
67
68         done    chan struct{} // closed when we should stop reporting
69         flushed chan struct{} // closed when we have made our last report
70 }
71
72 // Start starts monitoring in a new goroutine, and returns
73 // immediately.
74 //
75 // The monitoring goroutine waits for a non-empty CIDFile to appear
76 // (unless CID is non-empty). Then it waits for the accounting files
77 // to appear for the monitored container. Then it collects and reports
78 // statistics until Stop is called.
79 //
80 // Callers should not call Start more than once.
81 //
82 // Callers should not modify public data fields after calling Start.
83 func (r *Reporter) Start() {
84         r.done = make(chan struct{})
85         r.flushed = make(chan struct{})
86         go r.run()
87 }
88
89 // Stop reporting. Do not call more than once, or before calling
90 // Start.
91 //
92 // Nothing will be logged after Stop returns.
93 func (r *Reporter) Stop() {
94         close(r.done)
95         <-r.flushed
96 }
97
98 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
99         content, err := ioutil.ReadAll(in)
100         if err != nil {
101                 r.Logger.Printf("warning: %v", err)
102         }
103         return content, err
104 }
105
106 // Open the cgroup stats file in /sys/fs corresponding to the target
107 // cgroup, and return an io.ReadCloser. If no stats file is available,
108 // return nil.
109 //
110 // Log the file that was opened, if it isn't the same file opened on
111 // the last openStatFile for this stat.
112 //
113 // Log "not available" if no file is found and either this stat has
114 // been available in the past, or verbose==true.
115 //
116 // TODO: Instead of trying all options, choose a process in the
117 // container, and read /proc/PID/cgroup to determine the appropriate
118 // cgroup root for the given statgroup. (This will avoid falling back
119 // to host-level stats during container setup and teardown.)
120 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
121         var paths []string
122         if r.CID != "" {
123                 // Collect container's stats
124                 paths = []string{
125                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
126                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
127                 }
128         } else {
129                 // Collect this host's stats
130                 paths = []string{
131                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
132                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
133                 }
134         }
135         var path string
136         var file *os.File
137         var err error
138         for _, path = range paths {
139                 file, err = os.Open(path)
140                 if err == nil {
141                         break
142                 } else {
143                         path = ""
144                 }
145         }
146         if pathWas := r.reportedStatFile[stat]; pathWas != path {
147                 // Log whenever we start using a new/different cgroup
148                 // stat file for a given statistic. This typically
149                 // happens 1 to 3 times per statistic, depending on
150                 // whether we happen to collect stats [a] before any
151                 // processes have been created in the container and
152                 // [b] after all contained processes have exited.
153                 if path == "" && verbose {
154                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
155                 } else if pathWas != "" {
156                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
157                 } else {
158                         r.Logger.Printf("notice: reading stats from %s\n", path)
159                 }
160                 r.reportedStatFile[stat] = path
161         }
162         return file, err
163 }
164
165 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
166         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
167         if err != nil {
168                 return nil, err
169         }
170         defer procsFile.Close()
171         reader := bufio.NewScanner(procsFile)
172         for reader.Scan() {
173                 taskPid := reader.Text()
174                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
175                 stats, err := ioutil.ReadFile(statsFilename)
176                 if err != nil {
177                         r.Logger.Printf("notice: %v", err)
178                         continue
179                 }
180                 return strings.NewReader(string(stats)), nil
181         }
182         return nil, errors.New("Could not read stats for any proc in container")
183 }
184
185 type ioSample struct {
186         sampleTime time.Time
187         txBytes    int64
188         rxBytes    int64
189 }
190
191 func (r *Reporter) doBlkIOStats() {
192         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
193         if err != nil {
194                 return
195         }
196         defer c.Close()
197         b := bufio.NewScanner(c)
198         var sampleTime = time.Now()
199         newSamples := make(map[string]ioSample)
200         for b.Scan() {
201                 var device, op string
202                 var val int64
203                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
204                         continue
205                 }
206                 var thisSample ioSample
207                 var ok bool
208                 if thisSample, ok = newSamples[device]; !ok {
209                         thisSample = ioSample{sampleTime, -1, -1}
210                 }
211                 switch op {
212                 case "Read":
213                         thisSample.rxBytes = val
214                 case "Write":
215                         thisSample.txBytes = val
216                 }
217                 newSamples[device] = thisSample
218         }
219         for dev, sample := range newSamples {
220                 if sample.txBytes < 0 || sample.rxBytes < 0 {
221                         continue
222                 }
223                 delta := ""
224                 if prev, ok := r.lastDiskIOSample[dev]; ok {
225                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
226                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
227                                 sample.txBytes-prev.txBytes,
228                                 sample.rxBytes-prev.rxBytes)
229                 }
230                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
231                 r.lastDiskIOSample[dev] = sample
232         }
233 }
234
235 type memSample struct {
236         sampleTime time.Time
237         memStat    map[string]int64
238 }
239
240 func (r *Reporter) doMemoryStats() {
241         c, err := r.openStatFile("memory", "memory.stat", true)
242         if err != nil {
243                 return
244         }
245         defer c.Close()
246         b := bufio.NewScanner(c)
247         thisSample := memSample{time.Now(), make(map[string]int64)}
248         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
249         for b.Scan() {
250                 var stat string
251                 var val int64
252                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
253                         continue
254                 }
255                 thisSample.memStat[stat] = val
256         }
257         var outstat bytes.Buffer
258         for _, key := range wantStats {
259                 // Use "total_X" stats (entire hierarchy) if enabled,
260                 // otherwise just the single cgroup -- see
261                 // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
262                 if val, ok := thisSample.memStat["total_"+key]; ok {
263                         fmt.Fprintf(&outstat, " %d %s", val, key)
264                 } else if val, ok := thisSample.memStat[key]; ok {
265                         fmt.Fprintf(&outstat, " %d %s", val, key)
266                 }
267         }
268         r.Logger.Printf("mem%s\n", outstat.String())
269 }
270
271 func (r *Reporter) doNetworkStats() {
272         sampleTime := time.Now()
273         stats, err := r.getContainerNetStats()
274         if err != nil {
275                 return
276         }
277
278         scanner := bufio.NewScanner(stats)
279         for scanner.Scan() {
280                 var ifName string
281                 var rx, tx int64
282                 words := strings.Fields(scanner.Text())
283                 if len(words) != 17 {
284                         // Skip lines with wrong format
285                         continue
286                 }
287                 ifName = strings.TrimRight(words[0], ":")
288                 if ifName == "lo" || ifName == "" {
289                         // Skip loopback interface and lines with wrong format
290                         continue
291                 }
292                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
293                         continue
294                 }
295                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
296                         continue
297                 }
298                 nextSample := ioSample{}
299                 nextSample.sampleTime = sampleTime
300                 nextSample.txBytes = tx
301                 nextSample.rxBytes = rx
302                 var delta string
303                 if prev, ok := r.lastNetSample[ifName]; ok {
304                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
305                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
306                                 interval,
307                                 tx-prev.txBytes,
308                                 rx-prev.rxBytes)
309                 }
310                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
311                 r.lastNetSample[ifName] = nextSample
312         }
313 }
314
315 type diskSpaceSample struct {
316         hasData    bool
317         sampleTime time.Time
318         total      uint64
319         used       uint64
320         available  uint64
321 }
322
323 func (r *Reporter) doDiskSpaceStats() {
324         s := syscall.Statfs_t{}
325         err := syscall.Statfs(r.TempDir, &s)
326         if err != nil {
327                 return
328         }
329         bs := uint64(s.Bsize)
330         nextSample := diskSpaceSample{
331                 hasData:    true,
332                 sampleTime: time.Now(),
333                 total:      s.Blocks * bs,
334                 used:       (s.Blocks - s.Bfree) * bs,
335                 available:  s.Bavail * bs,
336         }
337
338         var delta string
339         if r.lastDiskSpaceSample.hasData {
340                 prev := r.lastDiskSpaceSample
341                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
342                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
343                         interval,
344                         int64(nextSample.used-prev.used))
345         }
346         r.Logger.Printf("statfs %d available %d used %d total%s\n",
347                 nextSample.available, nextSample.used, nextSample.total, delta)
348         r.lastDiskSpaceSample = nextSample
349 }
350
351 type cpuSample struct {
352         hasData    bool // to distinguish the zero value from real data
353         sampleTime time.Time
354         user       float64
355         sys        float64
356         cpus       int64
357 }
358
359 // Return the number of CPUs available in the container. Return 0 if
360 // we can't figure out the real number of CPUs.
361 func (r *Reporter) getCPUCount() int64 {
362         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
363         if err != nil {
364                 return 0
365         }
366         defer cpusetFile.Close()
367         b, err := r.readAllOrWarn(cpusetFile)
368         if err != nil {
369                 return 0
370         }
371         sp := strings.Split(string(b), ",")
372         cpus := int64(0)
373         for _, v := range sp {
374                 var min, max int64
375                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
376                 if n == 2 {
377                         cpus += (max - min) + 1
378                 } else {
379                         cpus++
380                 }
381         }
382         return cpus
383 }
384
385 func (r *Reporter) doCPUStats() {
386         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
387         if err != nil {
388                 return
389         }
390         defer statFile.Close()
391         b, err := r.readAllOrWarn(statFile)
392         if err != nil {
393                 return
394         }
395
396         var userTicks, sysTicks int64
397         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
398         userHz := float64(C.sysconf(C._SC_CLK_TCK))
399         nextSample := cpuSample{
400                 hasData:    true,
401                 sampleTime: time.Now(),
402                 user:       float64(userTicks) / userHz,
403                 sys:        float64(sysTicks) / userHz,
404                 cpus:       r.getCPUCount(),
405         }
406
407         delta := ""
408         if r.lastCPUSample.hasData {
409                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
410                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
411                         nextSample.user-r.lastCPUSample.user,
412                         nextSample.sys-r.lastCPUSample.sys)
413         }
414         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
415                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
416         r.lastCPUSample = nextSample
417 }
418
419 // Report stats periodically until we learn (via r.done) that someone
420 // called Stop.
421 func (r *Reporter) run() {
422         defer close(r.flushed)
423
424         r.reportedStatFile = make(map[string]string)
425
426         if !r.waitForCIDFile() || !r.waitForCgroup() {
427                 return
428         }
429
430         r.lastNetSample = make(map[string]ioSample)
431         r.lastDiskIOSample = make(map[string]ioSample)
432
433         if len(r.TempDir) == 0 {
434                 // Temporary dir not provided, try to get it from the environment.
435                 r.TempDir = os.Getenv("TMPDIR")
436         }
437         if len(r.TempDir) > 0 {
438                 r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
439         }
440
441         ticker := time.NewTicker(r.PollPeriod)
442         for {
443                 r.doMemoryStats()
444                 r.doCPUStats()
445                 r.doBlkIOStats()
446                 r.doNetworkStats()
447                 r.doDiskSpaceStats()
448                 select {
449                 case <-r.done:
450                         return
451                 case <-ticker.C:
452                 }
453         }
454 }
455
456 // If CID is empty, wait for it to appear in CIDFile. Return true if
457 // we get it before we learn (via r.done) that someone called Stop.
458 func (r *Reporter) waitForCIDFile() bool {
459         if r.CID != "" || r.CIDFile == "" {
460                 return true
461         }
462
463         ticker := time.NewTicker(100 * time.Millisecond)
464         defer ticker.Stop()
465         for {
466                 cid, err := ioutil.ReadFile(r.CIDFile)
467                 if err == nil && len(cid) > 0 {
468                         r.CID = string(cid)
469                         return true
470                 }
471                 select {
472                 case <-ticker.C:
473                 case <-r.done:
474                         r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
475                         return false
476                 }
477         }
478 }
479
480 // Wait for the cgroup stats files to appear in cgroup_root. Return
481 // true if they appear before r.done indicates someone called Stop. If
482 // they don't appear within one poll interval, log a warning and keep
483 // waiting.
484 func (r *Reporter) waitForCgroup() bool {
485         ticker := time.NewTicker(100 * time.Millisecond)
486         defer ticker.Stop()
487         warningTimer := time.After(r.PollPeriod)
488         for {
489                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
490                 if err == nil {
491                         c.Close()
492                         return true
493                 }
494                 select {
495                 case <-ticker.C:
496                 case <-warningTimer:
497                         r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
498                 case <-r.done:
499                         r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
500                         return false
501                 }
502         }
503 }