10346: Start documenting how to access Keep.
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Package crunchstat reports resource usage (CPU, memory, disk,
2 // network) for a cgroup.
3 package crunchstat
4
5 import (
6         "bufio"
7         "bytes"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "os"
14         "strconv"
15         "strings"
16         "time"
17 )
18
19 // This magically allows us to look up userHz via _SC_CLK_TCK:
20
21 /*
22 #include <unistd.h>
23 #include <sys/types.h>
24 #include <pwd.h>
25 #include <stdlib.h>
26 */
27 import "C"
28
29 // A Reporter gathers statistics for a cgroup and writes them to a
30 // log.Logger.
31 type Reporter struct {
32         // CID of the container to monitor. If empty, read the CID
33         // from CIDFile (first waiting until a non-empty file appears
34         // at CIDFile). If CIDFile is also empty, report host
35         // statistics.
36         CID string
37
38         // Path to a file we can read CID from.
39         CIDFile string
40
41         // Where cgroup accounting files live on this system, e.g.,
42         // "/sys/fs/cgroup".
43         CgroupRoot string
44
45         // Parent cgroup, e.g., "docker".
46         CgroupParent string
47
48         // Interval between samples. Must be positive.
49         PollPeriod time.Duration
50
51         // Where to write statistics. Must not be nil.
52         Logger *log.Logger
53
54         reportedStatFile map[string]string
55         lastNetSample    map[string]ioSample
56         lastDiskSample   map[string]ioSample
57         lastCPUSample    cpuSample
58
59         done    chan struct{} // closed when we should stop reporting
60         flushed chan struct{} // closed when we have made our last report
61 }
62
63 // Start starts monitoring in a new goroutine, and returns
64 // immediately.
65 //
66 // The monitoring goroutine waits for a non-empty CIDFile to appear
67 // (unless CID is non-empty). Then it waits for the accounting files
68 // to appear for the monitored container. Then it collects and reports
69 // statistics until Stop is called.
70 //
71 // Callers should not call Start more than once.
72 //
73 // Callers should not modify public data fields after calling Start.
74 func (r *Reporter) Start() {
75         r.done = make(chan struct{})
76         r.flushed = make(chan struct{})
77         go r.run()
78 }
79
80 // Stop reporting. Do not call more than once, or before calling
81 // Start.
82 //
83 // Nothing will be logged after Stop returns.
84 func (r *Reporter) Stop() {
85         close(r.done)
86         <-r.flushed
87 }
88
89 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
90         content, err := ioutil.ReadAll(in)
91         if err != nil {
92                 r.Logger.Print(err)
93         }
94         return content, err
95 }
96
97 // Open the cgroup stats file in /sys/fs corresponding to the target
98 // cgroup, and return an io.ReadCloser. If no stats file is available,
99 // return nil.
100 //
101 // Log the file that was opened, if it isn't the same file opened on
102 // the last openStatFile for this stat.
103 //
104 // Log "not available" if no file is found and either this stat has
105 // been available in the past, or verbose==true.
106 //
107 // TODO: Instead of trying all options, choose a process in the
108 // container, and read /proc/PID/cgroup to determine the appropriate
109 // cgroup root for the given statgroup. (This will avoid falling back
110 // to host-level stats during container setup and teardown.)
111 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
112         var paths []string
113         if r.CID != "" {
114                 // Collect container's stats
115                 paths = []string{
116                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
117                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
118                 }
119         } else {
120                 // Collect this host's stats
121                 paths = []string{
122                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
123                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
124                 }
125         }
126         var path string
127         var file *os.File
128         var err error
129         for _, path = range paths {
130                 file, err = os.Open(path)
131                 if err == nil {
132                         break
133                 } else {
134                         path = ""
135                 }
136         }
137         if pathWas := r.reportedStatFile[stat]; pathWas != path {
138                 // Log whenever we start using a new/different cgroup
139                 // stat file for a given statistic. This typically
140                 // happens 1 to 3 times per statistic, depending on
141                 // whether we happen to collect stats [a] before any
142                 // processes have been created in the container and
143                 // [b] after all contained processes have exited.
144                 if path == "" && verbose {
145                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
146                 } else if pathWas != "" {
147                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
148                 } else {
149                         r.Logger.Printf("notice: reading stats from %s\n", path)
150                 }
151                 r.reportedStatFile[stat] = path
152         }
153         return file, err
154 }
155
156 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
157         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
158         if err != nil {
159                 return nil, err
160         }
161         defer procsFile.Close()
162         reader := bufio.NewScanner(procsFile)
163         for reader.Scan() {
164                 taskPid := reader.Text()
165                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
166                 stats, err := ioutil.ReadFile(statsFilename)
167                 if err != nil {
168                         r.Logger.Print(err)
169                         continue
170                 }
171                 return strings.NewReader(string(stats)), nil
172         }
173         return nil, errors.New("Could not read stats for any proc in container")
174 }
175
176 type ioSample struct {
177         sampleTime time.Time
178         txBytes    int64
179         rxBytes    int64
180 }
181
182 func (r *Reporter) doBlkIOStats() {
183         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
184         if err != nil {
185                 return
186         }
187         defer c.Close()
188         b := bufio.NewScanner(c)
189         var sampleTime = time.Now()
190         newSamples := make(map[string]ioSample)
191         for b.Scan() {
192                 var device, op string
193                 var val int64
194                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
195                         continue
196                 }
197                 var thisSample ioSample
198                 var ok bool
199                 if thisSample, ok = newSamples[device]; !ok {
200                         thisSample = ioSample{sampleTime, -1, -1}
201                 }
202                 switch op {
203                 case "Read":
204                         thisSample.rxBytes = val
205                 case "Write":
206                         thisSample.txBytes = val
207                 }
208                 newSamples[device] = thisSample
209         }
210         for dev, sample := range newSamples {
211                 if sample.txBytes < 0 || sample.rxBytes < 0 {
212                         continue
213                 }
214                 delta := ""
215                 if prev, ok := r.lastDiskSample[dev]; ok {
216                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
217                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
218                                 sample.txBytes-prev.txBytes,
219                                 sample.rxBytes-prev.rxBytes)
220                 }
221                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
222                 r.lastDiskSample[dev] = sample
223         }
224 }
225
226 type memSample struct {
227         sampleTime time.Time
228         memStat    map[string]int64
229 }
230
231 func (r *Reporter) doMemoryStats() {
232         c, err := r.openStatFile("memory", "memory.stat", true)
233         if err != nil {
234                 return
235         }
236         defer c.Close()
237         b := bufio.NewScanner(c)
238         thisSample := memSample{time.Now(), make(map[string]int64)}
239         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
240         for b.Scan() {
241                 var stat string
242                 var val int64
243                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
244                         continue
245                 }
246                 thisSample.memStat[stat] = val
247         }
248         var outstat bytes.Buffer
249         for _, key := range wantStats {
250                 if val, ok := thisSample.memStat[key]; ok {
251                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
252                 }
253         }
254         r.Logger.Printf("mem%s\n", outstat.String())
255 }
256
257 func (r *Reporter) doNetworkStats() {
258         sampleTime := time.Now()
259         stats, err := r.getContainerNetStats()
260         if err != nil {
261                 return
262         }
263
264         scanner := bufio.NewScanner(stats)
265         for scanner.Scan() {
266                 var ifName string
267                 var rx, tx int64
268                 words := strings.Fields(scanner.Text())
269                 if len(words) != 17 {
270                         // Skip lines with wrong format
271                         continue
272                 }
273                 ifName = strings.TrimRight(words[0], ":")
274                 if ifName == "lo" || ifName == "" {
275                         // Skip loopback interface and lines with wrong format
276                         continue
277                 }
278                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
279                         continue
280                 }
281                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
282                         continue
283                 }
284                 nextSample := ioSample{}
285                 nextSample.sampleTime = sampleTime
286                 nextSample.txBytes = tx
287                 nextSample.rxBytes = rx
288                 var delta string
289                 if prev, ok := r.lastNetSample[ifName]; ok {
290                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
291                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
292                                 interval,
293                                 tx-prev.txBytes,
294                                 rx-prev.rxBytes)
295                 }
296                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
297                 r.lastNetSample[ifName] = nextSample
298         }
299 }
300
301 type cpuSample struct {
302         hasData    bool // to distinguish the zero value from real data
303         sampleTime time.Time
304         user       float64
305         sys        float64
306         cpus       int64
307 }
308
309 // Return the number of CPUs available in the container. Return 0 if
310 // we can't figure out the real number of CPUs.
311 func (r *Reporter) getCPUCount() int64 {
312         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
313         if err != nil {
314                 return 0
315         }
316         defer cpusetFile.Close()
317         b, err := r.readAllOrWarn(cpusetFile)
318         if err != nil {
319                 return 0
320         }
321         sp := strings.Split(string(b), ",")
322         cpus := int64(0)
323         for _, v := range sp {
324                 var min, max int64
325                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
326                 if n == 2 {
327                         cpus += (max - min) + 1
328                 } else {
329                         cpus++
330                 }
331         }
332         return cpus
333 }
334
335 func (r *Reporter) doCPUStats() {
336         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
337         if err != nil {
338                 return
339         }
340         defer statFile.Close()
341         b, err := r.readAllOrWarn(statFile)
342         if err != nil {
343                 return
344         }
345
346         var userTicks, sysTicks int64
347         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
348         userHz := float64(C.sysconf(C._SC_CLK_TCK))
349         nextSample := cpuSample{
350                 hasData:    true,
351                 sampleTime: time.Now(),
352                 user:       float64(userTicks) / userHz,
353                 sys:        float64(sysTicks) / userHz,
354                 cpus:       r.getCPUCount(),
355         }
356
357         delta := ""
358         if r.lastCPUSample.hasData {
359                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
360                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
361                         nextSample.user-r.lastCPUSample.user,
362                         nextSample.sys-r.lastCPUSample.sys)
363         }
364         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
365                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
366         r.lastCPUSample = nextSample
367 }
368
369 // Report stats periodically until we learn (via r.done) that someone
370 // called Stop.
371 func (r *Reporter) run() {
372         defer close(r.flushed)
373
374         r.reportedStatFile = make(map[string]string)
375
376         if !r.waitForCIDFile() || !r.waitForCgroup() {
377                 return
378         }
379
380         r.lastNetSample = make(map[string]ioSample)
381         r.lastDiskSample = make(map[string]ioSample)
382
383         ticker := time.NewTicker(r.PollPeriod)
384         for {
385                 r.doMemoryStats()
386                 r.doCPUStats()
387                 r.doBlkIOStats()
388                 r.doNetworkStats()
389                 select {
390                 case <-r.done:
391                         return
392                 case <-ticker.C:
393                 }
394         }
395 }
396
397 // If CID is empty, wait for it to appear in CIDFile. Return true if
398 // we get it before we learn (via r.done) that someone called Stop.
399 func (r *Reporter) waitForCIDFile() bool {
400         if r.CID != "" || r.CIDFile == "" {
401                 return true
402         }
403
404         ticker := time.NewTicker(100 * time.Millisecond)
405         defer ticker.Stop()
406         for {
407                 cid, err := ioutil.ReadFile(r.CIDFile)
408                 if err == nil && len(cid) > 0 {
409                         r.CID = string(cid)
410                         return true
411                 }
412                 select {
413                 case <-ticker.C:
414                 case <-r.done:
415                         r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
416                         return false
417                 }
418         }
419 }
420
421 // Wait for the cgroup stats files to appear in cgroup_root. Return
422 // true if they appear before r.done indicates someone called Stop. If
423 // they don't appear within one poll interval, log a warning and keep
424 // waiting.
425 func (r *Reporter) waitForCgroup() bool {
426         ticker := time.NewTicker(100 * time.Millisecond)
427         defer ticker.Stop()
428         warningTimer := time.After(r.PollPeriod)
429         for {
430                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
431                 if err == nil {
432                         c.Close()
433                         return true
434                 }
435                 select {
436                 case <-ticker.C:
437                 case <-warningTimer:
438                         r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
439                 case <-r.done:
440                         r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
441                         return false
442                 }
443         }
444 }