12444: Adds tmpdir stats to crunchstat reporting
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // This magically allows us to look up userHz via _SC_CLK_TCK:
25
26 /*
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <pwd.h>
30 #include <stdlib.h>
31 */
32 import "C"
33
34 // A Reporter gathers statistics for a cgroup and writes them to a
35 // log.Logger.
36 type Reporter struct {
37         // CID of the container to monitor. If empty, read the CID
38         // from CIDFile (first waiting until a non-empty file appears
39         // at CIDFile). If CIDFile is also empty, report host
40         // statistics.
41         CID string
42
43         // Path to a file we can read CID from.
44         CIDFile string
45
46         // Where cgroup accounting files live on this system, e.g.,
47         // "/sys/fs/cgroup".
48         CgroupRoot string
49
50         // Parent cgroup, e.g., "docker".
51         CgroupParent string
52
53         // Interval between samples. Must be positive.
54         PollPeriod time.Duration
55
56         // Where to write statistics. Must not be nil.
57         Logger *log.Logger
58
59         reportedStatFile    map[string]string
60         lastNetSample       map[string]ioSample
61         lastDiskIOSample    map[string]ioSample
62         lastCPUSample       cpuSample
63         lastDiskSpaceSample diskSpaceSample
64
65         done    chan struct{} // closed when we should stop reporting
66         flushed chan struct{} // closed when we have made our last report
67 }
68
69 // Start starts monitoring in a new goroutine, and returns
70 // immediately.
71 //
72 // The monitoring goroutine waits for a non-empty CIDFile to appear
73 // (unless CID is non-empty). Then it waits for the accounting files
74 // to appear for the monitored container. Then it collects and reports
75 // statistics until Stop is called.
76 //
77 // Callers should not call Start more than once.
78 //
79 // Callers should not modify public data fields after calling Start.
80 func (r *Reporter) Start() {
81         r.done = make(chan struct{})
82         r.flushed = make(chan struct{})
83         go r.run()
84 }
85
86 // Stop reporting. Do not call more than once, or before calling
87 // Start.
88 //
89 // Nothing will be logged after Stop returns.
90 func (r *Reporter) Stop() {
91         close(r.done)
92         <-r.flushed
93 }
94
95 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
96         content, err := ioutil.ReadAll(in)
97         if err != nil {
98                 r.Logger.Printf("warning: %v", err)
99         }
100         return content, err
101 }
102
103 // Open the cgroup stats file in /sys/fs corresponding to the target
104 // cgroup, and return an io.ReadCloser. If no stats file is available,
105 // return nil.
106 //
107 // Log the file that was opened, if it isn't the same file opened on
108 // the last openStatFile for this stat.
109 //
110 // Log "not available" if no file is found and either this stat has
111 // been available in the past, or verbose==true.
112 //
113 // TODO: Instead of trying all options, choose a process in the
114 // container, and read /proc/PID/cgroup to determine the appropriate
115 // cgroup root for the given statgroup. (This will avoid falling back
116 // to host-level stats during container setup and teardown.)
117 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
118         var paths []string
119         if r.CID != "" {
120                 // Collect container's stats
121                 paths = []string{
122                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
123                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
124                 }
125         } else {
126                 // Collect this host's stats
127                 paths = []string{
128                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
129                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
130                 }
131         }
132         var path string
133         var file *os.File
134         var err error
135         for _, path = range paths {
136                 file, err = os.Open(path)
137                 if err == nil {
138                         break
139                 } else {
140                         path = ""
141                 }
142         }
143         if pathWas := r.reportedStatFile[stat]; pathWas != path {
144                 // Log whenever we start using a new/different cgroup
145                 // stat file for a given statistic. This typically
146                 // happens 1 to 3 times per statistic, depending on
147                 // whether we happen to collect stats [a] before any
148                 // processes have been created in the container and
149                 // [b] after all contained processes have exited.
150                 if path == "" && verbose {
151                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
152                 } else if pathWas != "" {
153                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
154                 } else {
155                         r.Logger.Printf("notice: reading stats from %s\n", path)
156                 }
157                 r.reportedStatFile[stat] = path
158         }
159         return file, err
160 }
161
162 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
163         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
164         if err != nil {
165                 return nil, err
166         }
167         defer procsFile.Close()
168         reader := bufio.NewScanner(procsFile)
169         for reader.Scan() {
170                 taskPid := reader.Text()
171                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
172                 stats, err := ioutil.ReadFile(statsFilename)
173                 if err != nil {
174                         r.Logger.Printf("notice: %v", err)
175                         continue
176                 }
177                 return strings.NewReader(string(stats)), nil
178         }
179         return nil, errors.New("Could not read stats for any proc in container")
180 }
181
182 type ioSample struct {
183         sampleTime time.Time
184         txBytes    int64
185         rxBytes    int64
186 }
187
188 func (r *Reporter) doBlkIOStats() {
189         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
190         if err != nil {
191                 return
192         }
193         defer c.Close()
194         b := bufio.NewScanner(c)
195         var sampleTime = time.Now()
196         newSamples := make(map[string]ioSample)
197         for b.Scan() {
198                 var device, op string
199                 var val int64
200                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
201                         continue
202                 }
203                 var thisSample ioSample
204                 var ok bool
205                 if thisSample, ok = newSamples[device]; !ok {
206                         thisSample = ioSample{sampleTime, -1, -1}
207                 }
208                 switch op {
209                 case "Read":
210                         thisSample.rxBytes = val
211                 case "Write":
212                         thisSample.txBytes = val
213                 }
214                 newSamples[device] = thisSample
215         }
216         for dev, sample := range newSamples {
217                 if sample.txBytes < 0 || sample.rxBytes < 0 {
218                         continue
219                 }
220                 delta := ""
221                 if prev, ok := r.lastDiskIOSample[dev]; ok {
222                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
223                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
224                                 sample.txBytes-prev.txBytes,
225                                 sample.rxBytes-prev.rxBytes)
226                 }
227                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
228                 r.lastDiskIOSample[dev] = sample
229         }
230 }
231
232 type memSample struct {
233         sampleTime time.Time
234         memStat    map[string]int64
235 }
236
237 func (r *Reporter) doMemoryStats() {
238         c, err := r.openStatFile("memory", "memory.stat", true)
239         if err != nil {
240                 return
241         }
242         defer c.Close()
243         b := bufio.NewScanner(c)
244         thisSample := memSample{time.Now(), make(map[string]int64)}
245         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
246         for b.Scan() {
247                 var stat string
248                 var val int64
249                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
250                         continue
251                 }
252                 thisSample.memStat[stat] = val
253         }
254         var outstat bytes.Buffer
255         for _, key := range wantStats {
256                 if val, ok := thisSample.memStat[key]; ok {
257                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
258                 }
259         }
260         r.Logger.Printf("mem%s\n", outstat.String())
261 }
262
263 func (r *Reporter) doNetworkStats() {
264         sampleTime := time.Now()
265         stats, err := r.getContainerNetStats()
266         if err != nil {
267                 return
268         }
269
270         scanner := bufio.NewScanner(stats)
271         for scanner.Scan() {
272                 var ifName string
273                 var rx, tx int64
274                 words := strings.Fields(scanner.Text())
275                 if len(words) != 17 {
276                         // Skip lines with wrong format
277                         continue
278                 }
279                 ifName = strings.TrimRight(words[0], ":")
280                 if ifName == "lo" || ifName == "" {
281                         // Skip loopback interface and lines with wrong format
282                         continue
283                 }
284                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
285                         continue
286                 }
287                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
288                         continue
289                 }
290                 nextSample := ioSample{}
291                 nextSample.sampleTime = sampleTime
292                 nextSample.txBytes = tx
293                 nextSample.rxBytes = rx
294                 var delta string
295                 if prev, ok := r.lastNetSample[ifName]; ok {
296                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
297                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
298                                 interval,
299                                 tx-prev.txBytes,
300                                 rx-prev.rxBytes)
301                 }
302                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
303                 r.lastNetSample[ifName] = nextSample
304         }
305 }
306
307 type diskSpaceSample struct {
308         hasData    bool
309         sampleTime time.Time
310         total      uint64
311         used       uint64
312         available  uint64
313 }
314
315 func (r *Reporter) doDiskSpaceStats() {
316         s := syscall.Statfs_t{}
317         err := syscall.Statfs("/tmp", &s)
318         if err != nil {
319                 return
320         }
321         bs := uint64(s.Bsize)
322         nextSample := diskSpaceSample{
323                 hasData:    true,
324                 sampleTime: time.Now(),
325                 total:      s.Blocks * bs,
326                 used:       (s.Blocks - s.Bavail) * bs,
327                 available:  s.Bavail * bs,
328         }
329
330         var delta string
331         if r.lastDiskSpaceSample.hasData {
332                 prev := r.lastDiskSpaceSample
333                 interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
334                 delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
335                         interval,
336                         int64(nextSample.used-prev.used))
337         }
338         r.Logger.Printf("tmpdir available:%d used:%d total:%d%s\n",
339                 nextSample.available, nextSample.used, nextSample.total, delta)
340         r.lastDiskSpaceSample = nextSample
341 }
342
343 type cpuSample struct {
344         hasData    bool // to distinguish the zero value from real data
345         sampleTime time.Time
346         user       float64
347         sys        float64
348         cpus       int64
349 }
350
351 // Return the number of CPUs available in the container. Return 0 if
352 // we can't figure out the real number of CPUs.
353 func (r *Reporter) getCPUCount() int64 {
354         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
355         if err != nil {
356                 return 0
357         }
358         defer cpusetFile.Close()
359         b, err := r.readAllOrWarn(cpusetFile)
360         if err != nil {
361                 return 0
362         }
363         sp := strings.Split(string(b), ",")
364         cpus := int64(0)
365         for _, v := range sp {
366                 var min, max int64
367                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
368                 if n == 2 {
369                         cpus += (max - min) + 1
370                 } else {
371                         cpus++
372                 }
373         }
374         return cpus
375 }
376
377 func (r *Reporter) doCPUStats() {
378         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
379         if err != nil {
380                 return
381         }
382         defer statFile.Close()
383         b, err := r.readAllOrWarn(statFile)
384         if err != nil {
385                 return
386         }
387
388         var userTicks, sysTicks int64
389         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
390         userHz := float64(C.sysconf(C._SC_CLK_TCK))
391         nextSample := cpuSample{
392                 hasData:    true,
393                 sampleTime: time.Now(),
394                 user:       float64(userTicks) / userHz,
395                 sys:        float64(sysTicks) / userHz,
396                 cpus:       r.getCPUCount(),
397         }
398
399         delta := ""
400         if r.lastCPUSample.hasData {
401                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
402                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
403                         nextSample.user-r.lastCPUSample.user,
404                         nextSample.sys-r.lastCPUSample.sys)
405         }
406         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
407                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
408         r.lastCPUSample = nextSample
409 }
410
411 // Report stats periodically until we learn (via r.done) that someone
412 // called Stop.
413 func (r *Reporter) run() {
414         defer close(r.flushed)
415
416         r.reportedStatFile = make(map[string]string)
417
418         if !r.waitForCIDFile() || !r.waitForCgroup() {
419                 return
420         }
421
422         r.lastNetSample = make(map[string]ioSample)
423         r.lastDiskIOSample = make(map[string]ioSample)
424
425         ticker := time.NewTicker(r.PollPeriod)
426         for {
427                 r.doMemoryStats()
428                 r.doCPUStats()
429                 r.doBlkIOStats()
430                 r.doNetworkStats()
431                 r.doDiskSpaceStats()
432                 select {
433                 case <-r.done:
434                         return
435                 case <-ticker.C:
436                 }
437         }
438 }
439
440 // If CID is empty, wait for it to appear in CIDFile. Return true if
441 // we get it before we learn (via r.done) that someone called Stop.
442 func (r *Reporter) waitForCIDFile() bool {
443         if r.CID != "" || r.CIDFile == "" {
444                 return true
445         }
446
447         ticker := time.NewTicker(100 * time.Millisecond)
448         defer ticker.Stop()
449         for {
450                 cid, err := ioutil.ReadFile(r.CIDFile)
451                 if err == nil && len(cid) > 0 {
452                         r.CID = string(cid)
453                         return true
454                 }
455                 select {
456                 case <-ticker.C:
457                 case <-r.done:
458                         r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
459                         return false
460                 }
461         }
462 }
463
464 // Wait for the cgroup stats files to appear in cgroup_root. Return
465 // true if they appear before r.done indicates someone called Stop. If
466 // they don't appear within one poll interval, log a warning and keep
467 // waiting.
468 func (r *Reporter) waitForCgroup() bool {
469         ticker := time.NewTicker(100 * time.Millisecond)
470         defer ticker.Stop()
471         warningTimer := time.After(r.PollPeriod)
472         for {
473                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
474                 if err == nil {
475                         c.Close()
476                         return true
477                 }
478                 select {
479                 case <-ticker.C:
480                 case <-warningTimer:
481                         r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
482                 case <-r.done:
483                         r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
484                         return false
485                 }
486         }
487 }