Arvados-DCO-1.1-Signed-off-by: Radhika Chippada <radhika@curoverse.com>
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package crunchstat reports resource usage (CPU, memory, disk,
6 // network) for a cgroup.
7 package crunchstat
8
9 import (
10         "bufio"
11         "bytes"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "strconv"
19         "strings"
20         "time"
21 )
22
23 // This magically allows us to look up userHz via _SC_CLK_TCK:
24
25 /*
26 #include <unistd.h>
27 #include <sys/types.h>
28 #include <pwd.h>
29 #include <stdlib.h>
30 */
31 import "C"
32
33 // A Reporter gathers statistics for a cgroup and writes them to a
34 // log.Logger.
35 type Reporter struct {
36         // CID of the container to monitor. If empty, read the CID
37         // from CIDFile (first waiting until a non-empty file appears
38         // at CIDFile). If CIDFile is also empty, report host
39         // statistics.
40         CID string
41
42         // Path to a file we can read CID from.
43         CIDFile string
44
45         // Where cgroup accounting files live on this system, e.g.,
46         // "/sys/fs/cgroup".
47         CgroupRoot string
48
49         // Parent cgroup, e.g., "docker".
50         CgroupParent string
51
52         // Interval between samples. Must be positive.
53         PollPeriod time.Duration
54
55         // Where to write statistics. Must not be nil.
56         Logger *log.Logger
57
58         reportedStatFile map[string]string
59         lastNetSample    map[string]ioSample
60         lastDiskSample   map[string]ioSample
61         lastCPUSample    cpuSample
62
63         done    chan struct{} // closed when we should stop reporting
64         flushed chan struct{} // closed when we have made our last report
65 }
66
67 // Start starts monitoring in a new goroutine, and returns
68 // immediately.
69 //
70 // The monitoring goroutine waits for a non-empty CIDFile to appear
71 // (unless CID is non-empty). Then it waits for the accounting files
72 // to appear for the monitored container. Then it collects and reports
73 // statistics until Stop is called.
74 //
75 // Callers should not call Start more than once.
76 //
77 // Callers should not modify public data fields after calling Start.
78 func (r *Reporter) Start() {
79         r.done = make(chan struct{})
80         r.flushed = make(chan struct{})
81         go r.run()
82 }
83
84 // Stop reporting. Do not call more than once, or before calling
85 // Start.
86 //
87 // Nothing will be logged after Stop returns.
88 func (r *Reporter) Stop() {
89         close(r.done)
90         <-r.flushed
91 }
92
93 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
94         content, err := ioutil.ReadAll(in)
95         if err != nil {
96                 r.Logger.Print(err)
97         }
98         return content, err
99 }
100
101 // Open the cgroup stats file in /sys/fs corresponding to the target
102 // cgroup, and return an io.ReadCloser. If no stats file is available,
103 // return nil.
104 //
105 // Log the file that was opened, if it isn't the same file opened on
106 // the last openStatFile for this stat.
107 //
108 // Log "not available" if no file is found and either this stat has
109 // been available in the past, or verbose==true.
110 //
111 // TODO: Instead of trying all options, choose a process in the
112 // container, and read /proc/PID/cgroup to determine the appropriate
113 // cgroup root for the given statgroup. (This will avoid falling back
114 // to host-level stats during container setup and teardown.)
115 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
116         var paths []string
117         if r.CID != "" {
118                 // Collect container's stats
119                 paths = []string{
120                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
121                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
122                 }
123         } else {
124                 // Collect this host's stats
125                 paths = []string{
126                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
127                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
128                 }
129         }
130         var path string
131         var file *os.File
132         var err error
133         for _, path = range paths {
134                 file, err = os.Open(path)
135                 if err == nil {
136                         break
137                 } else {
138                         path = ""
139                 }
140         }
141         if pathWas := r.reportedStatFile[stat]; pathWas != path {
142                 // Log whenever we start using a new/different cgroup
143                 // stat file for a given statistic. This typically
144                 // happens 1 to 3 times per statistic, depending on
145                 // whether we happen to collect stats [a] before any
146                 // processes have been created in the container and
147                 // [b] after all contained processes have exited.
148                 if path == "" && verbose {
149                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
150                 } else if pathWas != "" {
151                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
152                 } else {
153                         r.Logger.Printf("notice: reading stats from %s\n", path)
154                 }
155                 r.reportedStatFile[stat] = path
156         }
157         return file, err
158 }
159
160 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
161         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
162         if err != nil {
163                 return nil, err
164         }
165         defer procsFile.Close()
166         reader := bufio.NewScanner(procsFile)
167         for reader.Scan() {
168                 taskPid := reader.Text()
169                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
170                 stats, err := ioutil.ReadFile(statsFilename)
171                 if err != nil {
172                         r.Logger.Print(err)
173                         continue
174                 }
175                 return strings.NewReader(string(stats)), nil
176         }
177         return nil, errors.New("Could not read stats for any proc in container")
178 }
179
180 type ioSample struct {
181         sampleTime time.Time
182         txBytes    int64
183         rxBytes    int64
184 }
185
186 func (r *Reporter) doBlkIOStats() {
187         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
188         if err != nil {
189                 return
190         }
191         defer c.Close()
192         b := bufio.NewScanner(c)
193         var sampleTime = time.Now()
194         newSamples := make(map[string]ioSample)
195         for b.Scan() {
196                 var device, op string
197                 var val int64
198                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
199                         continue
200                 }
201                 var thisSample ioSample
202                 var ok bool
203                 if thisSample, ok = newSamples[device]; !ok {
204                         thisSample = ioSample{sampleTime, -1, -1}
205                 }
206                 switch op {
207                 case "Read":
208                         thisSample.rxBytes = val
209                 case "Write":
210                         thisSample.txBytes = val
211                 }
212                 newSamples[device] = thisSample
213         }
214         for dev, sample := range newSamples {
215                 if sample.txBytes < 0 || sample.rxBytes < 0 {
216                         continue
217                 }
218                 delta := ""
219                 if prev, ok := r.lastDiskSample[dev]; ok {
220                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
221                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
222                                 sample.txBytes-prev.txBytes,
223                                 sample.rxBytes-prev.rxBytes)
224                 }
225                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
226                 r.lastDiskSample[dev] = sample
227         }
228 }
229
230 type memSample struct {
231         sampleTime time.Time
232         memStat    map[string]int64
233 }
234
235 func (r *Reporter) doMemoryStats() {
236         c, err := r.openStatFile("memory", "memory.stat", true)
237         if err != nil {
238                 return
239         }
240         defer c.Close()
241         b := bufio.NewScanner(c)
242         thisSample := memSample{time.Now(), make(map[string]int64)}
243         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
244         for b.Scan() {
245                 var stat string
246                 var val int64
247                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
248                         continue
249                 }
250                 thisSample.memStat[stat] = val
251         }
252         var outstat bytes.Buffer
253         for _, key := range wantStats {
254                 if val, ok := thisSample.memStat[key]; ok {
255                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
256                 }
257         }
258         r.Logger.Printf("mem%s\n", outstat.String())
259 }
260
261 func (r *Reporter) doNetworkStats() {
262         sampleTime := time.Now()
263         stats, err := r.getContainerNetStats()
264         if err != nil {
265                 return
266         }
267
268         scanner := bufio.NewScanner(stats)
269         for scanner.Scan() {
270                 var ifName string
271                 var rx, tx int64
272                 words := strings.Fields(scanner.Text())
273                 if len(words) != 17 {
274                         // Skip lines with wrong format
275                         continue
276                 }
277                 ifName = strings.TrimRight(words[0], ":")
278                 if ifName == "lo" || ifName == "" {
279                         // Skip loopback interface and lines with wrong format
280                         continue
281                 }
282                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
283                         continue
284                 }
285                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
286                         continue
287                 }
288                 nextSample := ioSample{}
289                 nextSample.sampleTime = sampleTime
290                 nextSample.txBytes = tx
291                 nextSample.rxBytes = rx
292                 var delta string
293                 if prev, ok := r.lastNetSample[ifName]; ok {
294                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
295                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
296                                 interval,
297                                 tx-prev.txBytes,
298                                 rx-prev.rxBytes)
299                 }
300                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
301                 r.lastNetSample[ifName] = nextSample
302         }
303 }
304
305 type cpuSample struct {
306         hasData    bool // to distinguish the zero value from real data
307         sampleTime time.Time
308         user       float64
309         sys        float64
310         cpus       int64
311 }
312
313 // Return the number of CPUs available in the container. Return 0 if
314 // we can't figure out the real number of CPUs.
315 func (r *Reporter) getCPUCount() int64 {
316         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
317         if err != nil {
318                 return 0
319         }
320         defer cpusetFile.Close()
321         b, err := r.readAllOrWarn(cpusetFile)
322         if err != nil {
323                 return 0
324         }
325         sp := strings.Split(string(b), ",")
326         cpus := int64(0)
327         for _, v := range sp {
328                 var min, max int64
329                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
330                 if n == 2 {
331                         cpus += (max - min) + 1
332                 } else {
333                         cpus++
334                 }
335         }
336         return cpus
337 }
338
339 func (r *Reporter) doCPUStats() {
340         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
341         if err != nil {
342                 return
343         }
344         defer statFile.Close()
345         b, err := r.readAllOrWarn(statFile)
346         if err != nil {
347                 return
348         }
349
350         var userTicks, sysTicks int64
351         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
352         userHz := float64(C.sysconf(C._SC_CLK_TCK))
353         nextSample := cpuSample{
354                 hasData:    true,
355                 sampleTime: time.Now(),
356                 user:       float64(userTicks) / userHz,
357                 sys:        float64(sysTicks) / userHz,
358                 cpus:       r.getCPUCount(),
359         }
360
361         delta := ""
362         if r.lastCPUSample.hasData {
363                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
364                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
365                         nextSample.user-r.lastCPUSample.user,
366                         nextSample.sys-r.lastCPUSample.sys)
367         }
368         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
369                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
370         r.lastCPUSample = nextSample
371 }
372
373 // Report stats periodically until we learn (via r.done) that someone
374 // called Stop.
375 func (r *Reporter) run() {
376         defer close(r.flushed)
377
378         r.reportedStatFile = make(map[string]string)
379
380         if !r.waitForCIDFile() || !r.waitForCgroup() {
381                 return
382         }
383
384         r.lastNetSample = make(map[string]ioSample)
385         r.lastDiskSample = make(map[string]ioSample)
386
387         ticker := time.NewTicker(r.PollPeriod)
388         for {
389                 r.doMemoryStats()
390                 r.doCPUStats()
391                 r.doBlkIOStats()
392                 r.doNetworkStats()
393                 select {
394                 case <-r.done:
395                         return
396                 case <-ticker.C:
397                 }
398         }
399 }
400
401 // If CID is empty, wait for it to appear in CIDFile. Return true if
402 // we get it before we learn (via r.done) that someone called Stop.
403 func (r *Reporter) waitForCIDFile() bool {
404         if r.CID != "" || r.CIDFile == "" {
405                 return true
406         }
407
408         ticker := time.NewTicker(100 * time.Millisecond)
409         defer ticker.Stop()
410         for {
411                 cid, err := ioutil.ReadFile(r.CIDFile)
412                 if err == nil && len(cid) > 0 {
413                         r.CID = string(cid)
414                         return true
415                 }
416                 select {
417                 case <-ticker.C:
418                 case <-r.done:
419                         r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
420                         return false
421                 }
422         }
423 }
424
425 // Wait for the cgroup stats files to appear in cgroup_root. Return
426 // true if they appear before r.done indicates someone called Stop. If
427 // they don't appear within one poll interval, log a warning and keep
428 // waiting.
429 func (r *Reporter) waitForCgroup() bool {
430         ticker := time.NewTicker(100 * time.Millisecond)
431         defer ticker.Stop()
432         warningTimer := time.After(r.PollPeriod)
433         for {
434                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
435                 if err == nil {
436                         c.Close()
437                         return true
438                 }
439                 select {
440                 case <-ticker.C:
441                 case <-warningTimer:
442                         r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
443                 case <-r.done:
444                         r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
445                         return false
446                 }
447         }
448 }