Merge branch 'master' into 9369-arv-cwl-docs
[arvados.git] / lib / crunchstat / crunchstat.go
1 // Package crunchstat reports resource usage (CPU, memory, disk,
2 // network) for a cgroup.
3 package crunchstat
4
5 import (
6         "bufio"
7         "bytes"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "os"
14         "strconv"
15         "strings"
16         "time"
17 )
18
19 // This magically allows us to look up userHz via _SC_CLK_TCK:
20
21 /*
22 #include <unistd.h>
23 #include <sys/types.h>
24 #include <pwd.h>
25 #include <stdlib.h>
26 */
27 import "C"
28
29 // A Reporter gathers statistics for a cgroup and writes them to a
30 // log.Logger.
31 type Reporter struct {
32         // CID of the container to monitor. If empty, read the CID
33         // from CIDFile (first waiting until a non-empty file appears
34         // at CIDFile). If CIDFile is also empty, report host
35         // statistics.
36         CID string
37
38         // Path to a file we can read CID from.
39         CIDFile string
40
41         // Where cgroup accounting files live on this system, e.g.,
42         // "/sys/fs/cgroup".
43         CgroupRoot string
44
45         // Parent cgroup, e.g., "docker".
46         CgroupParent string
47
48         // Interval between samples. Must be positive.
49         PollPeriod time.Duration
50
51         // Where to write statistics. Must not be nil.
52         Logger *log.Logger
53
54         reportedStatFile map[string]string
55         lastNetSample    map[string]ioSample
56         lastDiskSample   map[string]ioSample
57         lastCPUSample    cpuSample
58
59         done chan struct{}
60 }
61
62 // Start starts monitoring in a new goroutine, and returns
63 // immediately.
64 //
65 // The monitoring goroutine waits for a non-empty CIDFile to appear
66 // (unless CID is non-empty). Then it waits for the accounting files
67 // to appear for the monitored container. Then it collects and reports
68 // statistics until Stop is called.
69 //
70 // Callers should not call Start more than once.
71 //
72 // Callers should not modify public data fields after calling Start.
73 func (r *Reporter) Start() {
74         r.done = make(chan struct{})
75         go r.run()
76 }
77
78 // Stop reporting. Do not call more than once, or before calling
79 // Start.
80 //
81 // Nothing will be logged after Stop returns.
82 func (r *Reporter) Stop() {
83         close(r.done)
84 }
85
86 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
87         content, err := ioutil.ReadAll(in)
88         if err != nil {
89                 r.Logger.Print(err)
90         }
91         return content, err
92 }
93
94 // Open the cgroup stats file in /sys/fs corresponding to the target
95 // cgroup, and return an io.ReadCloser. If no stats file is available,
96 // return nil.
97 //
98 // Log the file that was opened, if it isn't the same file opened on
99 // the last openStatFile for this stat.
100 //
101 // Log "not available" if no file is found and either this stat has
102 // been available in the past, or verbose==true.
103 //
104 // TODO: Instead of trying all options, choose a process in the
105 // container, and read /proc/PID/cgroup to determine the appropriate
106 // cgroup root for the given statgroup. (This will avoid falling back
107 // to host-level stats during container setup and teardown.)
108 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
109         var paths []string
110         if r.CID != "" {
111                 // Collect container's stats
112                 paths = []string{
113                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
114                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
115                 }
116         } else {
117                 // Collect this host's stats
118                 paths = []string{
119                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
120                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
121                 }
122         }
123         var path string
124         var file *os.File
125         var err error
126         for _, path = range paths {
127                 file, err = os.Open(path)
128                 if err == nil {
129                         break
130                 } else {
131                         path = ""
132                 }
133         }
134         if pathWas := r.reportedStatFile[stat]; pathWas != path {
135                 // Log whenever we start using a new/different cgroup
136                 // stat file for a given statistic. This typically
137                 // happens 1 to 3 times per statistic, depending on
138                 // whether we happen to collect stats [a] before any
139                 // processes have been created in the container and
140                 // [b] after all contained processes have exited.
141                 if path == "" && verbose {
142                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
143                 } else if pathWas != "" {
144                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
145                 } else {
146                         r.Logger.Printf("notice: reading stats from %s\n", path)
147                 }
148                 r.reportedStatFile[stat] = path
149         }
150         return file, err
151 }
152
153 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
154         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
155         if err != nil {
156                 return nil, err
157         }
158         defer procsFile.Close()
159         reader := bufio.NewScanner(procsFile)
160         for reader.Scan() {
161                 taskPid := reader.Text()
162                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
163                 stats, err := ioutil.ReadFile(statsFilename)
164                 if err != nil {
165                         r.Logger.Print(err)
166                         continue
167                 }
168                 return strings.NewReader(string(stats)), nil
169         }
170         return nil, errors.New("Could not read stats for any proc in container")
171 }
172
173 type ioSample struct {
174         sampleTime time.Time
175         txBytes    int64
176         rxBytes    int64
177 }
178
179 func (r *Reporter) doBlkIOStats() {
180         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
181         if err != nil {
182                 return
183         }
184         defer c.Close()
185         b := bufio.NewScanner(c)
186         var sampleTime = time.Now()
187         newSamples := make(map[string]ioSample)
188         for b.Scan() {
189                 var device, op string
190                 var val int64
191                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
192                         continue
193                 }
194                 var thisSample ioSample
195                 var ok bool
196                 if thisSample, ok = newSamples[device]; !ok {
197                         thisSample = ioSample{sampleTime, -1, -1}
198                 }
199                 switch op {
200                 case "Read":
201                         thisSample.rxBytes = val
202                 case "Write":
203                         thisSample.txBytes = val
204                 }
205                 newSamples[device] = thisSample
206         }
207         for dev, sample := range newSamples {
208                 if sample.txBytes < 0 || sample.rxBytes < 0 {
209                         continue
210                 }
211                 delta := ""
212                 if prev, ok := r.lastDiskSample[dev]; ok {
213                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
214                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
215                                 sample.txBytes-prev.txBytes,
216                                 sample.rxBytes-prev.rxBytes)
217                 }
218                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
219                 r.lastDiskSample[dev] = sample
220         }
221 }
222
223 type memSample struct {
224         sampleTime time.Time
225         memStat    map[string]int64
226 }
227
228 func (r *Reporter) doMemoryStats() {
229         c, err := r.openStatFile("memory", "memory.stat", true)
230         if err != nil {
231                 return
232         }
233         defer c.Close()
234         b := bufio.NewScanner(c)
235         thisSample := memSample{time.Now(), make(map[string]int64)}
236         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
237         for b.Scan() {
238                 var stat string
239                 var val int64
240                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
241                         continue
242                 }
243                 thisSample.memStat[stat] = val
244         }
245         var outstat bytes.Buffer
246         for _, key := range wantStats {
247                 if val, ok := thisSample.memStat[key]; ok {
248                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
249                 }
250         }
251         r.Logger.Printf("mem%s\n", outstat.String())
252 }
253
254 func (r *Reporter) doNetworkStats() {
255         sampleTime := time.Now()
256         stats, err := r.getContainerNetStats()
257         if err != nil {
258                 return
259         }
260
261         scanner := bufio.NewScanner(stats)
262         for scanner.Scan() {
263                 var ifName string
264                 var rx, tx int64
265                 words := strings.Fields(scanner.Text())
266                 if len(words) != 17 {
267                         // Skip lines with wrong format
268                         continue
269                 }
270                 ifName = strings.TrimRight(words[0], ":")
271                 if ifName == "lo" || ifName == "" {
272                         // Skip loopback interface and lines with wrong format
273                         continue
274                 }
275                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
276                         continue
277                 }
278                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
279                         continue
280                 }
281                 nextSample := ioSample{}
282                 nextSample.sampleTime = sampleTime
283                 nextSample.txBytes = tx
284                 nextSample.rxBytes = rx
285                 var delta string
286                 if prev, ok := r.lastNetSample[ifName]; ok {
287                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
288                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
289                                 interval,
290                                 tx-prev.txBytes,
291                                 rx-prev.rxBytes)
292                 }
293                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
294                 r.lastNetSample[ifName] = nextSample
295         }
296 }
297
298 type cpuSample struct {
299         hasData    bool // to distinguish the zero value from real data
300         sampleTime time.Time
301         user       float64
302         sys        float64
303         cpus       int64
304 }
305
306 // Return the number of CPUs available in the container. Return 0 if
307 // we can't figure out the real number of CPUs.
308 func (r *Reporter) getCPUCount() int64 {
309         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
310         if err != nil {
311                 return 0
312         }
313         defer cpusetFile.Close()
314         b, err := r.readAllOrWarn(cpusetFile)
315         if err != nil {
316                 return 0
317         }
318         sp := strings.Split(string(b), ",")
319         cpus := int64(0)
320         for _, v := range sp {
321                 var min, max int64
322                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
323                 if n == 2 {
324                         cpus += (max - min) + 1
325                 } else {
326                         cpus++
327                 }
328         }
329         return cpus
330 }
331
332 func (r *Reporter) doCPUStats() {
333         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
334         if err != nil {
335                 return
336         }
337         defer statFile.Close()
338         b, err := r.readAllOrWarn(statFile)
339         if err != nil {
340                 return
341         }
342
343         var userTicks, sysTicks int64
344         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
345         userHz := float64(C.sysconf(C._SC_CLK_TCK))
346         nextSample := cpuSample{
347                 hasData:    true,
348                 sampleTime: time.Now(),
349                 user:       float64(userTicks) / userHz,
350                 sys:        float64(sysTicks) / userHz,
351                 cpus:       r.getCPUCount(),
352         }
353
354         delta := ""
355         if r.lastCPUSample.hasData {
356                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
357                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
358                         nextSample.user-r.lastCPUSample.user,
359                         nextSample.sys-r.lastCPUSample.sys)
360         }
361         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
362                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
363         r.lastCPUSample = nextSample
364 }
365
366 // Report stats periodically until we learn (via r.done) that someone
367 // called Stop.
368 func (r *Reporter) run() {
369         r.reportedStatFile = make(map[string]string)
370
371         if !r.waitForCIDFile() || !r.waitForCgroup() {
372                 return
373         }
374
375         r.lastNetSample = make(map[string]ioSample)
376         r.lastDiskSample = make(map[string]ioSample)
377
378         ticker := time.NewTicker(r.PollPeriod)
379         for {
380                 r.doMemoryStats()
381                 r.doCPUStats()
382                 r.doBlkIOStats()
383                 r.doNetworkStats()
384                 select {
385                 case <-r.done:
386                         return
387                 case <-ticker.C:
388                 }
389         }
390 }
391
392 // If CID is empty, wait for it to appear in CIDFile. Return true if
393 // we get it before we learn (via r.done) that someone called Stop.
394 func (r *Reporter) waitForCIDFile() bool {
395         if r.CID != "" || r.CIDFile == "" {
396                 return true
397         }
398
399         ticker := time.NewTicker(100 * time.Millisecond)
400         defer ticker.Stop()
401         for {
402                 cid, err := ioutil.ReadFile(r.CIDFile)
403                 if err == nil && len(cid) > 0 {
404                         r.CID = string(cid)
405                         return true
406                 }
407                 select {
408                 case <-ticker.C:
409                 case <-r.done:
410                         r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
411                         return false
412                 }
413         }
414 }
415
416 // Wait for the cgroup stats files to appear in cgroup_root. Return
417 // true if they appear before r.done indicates someone called Stop. If
418 // they don't appear within one poll interval, log a warning and keep
419 // waiting.
420 func (r *Reporter) waitForCgroup() bool {
421         ticker := time.NewTicker(100 * time.Millisecond)
422         defer ticker.Stop()
423         warningTimer := time.After(r.PollPeriod)
424         for {
425                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
426                 if err == nil {
427                         c.Close()
428                         return true
429                 }
430                 select {
431                 case <-ticker.C:
432                 case <-warningTimer:
433                         r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
434                 case <-r.done:
435                         r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
436                         return false
437                 }
438         }
439 }