8016: Reduce logging noise by waiting for cgroup files to appear before polling.
[arvados.git] / lib / crunchstat / crunchstat.go
1 package crunchstat
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "strconv"
13         "strings"
14         "time"
15 )
16
17 // This magically allows us to look up user_hz via _SC_CLK_TCK:
18
19 /*
20 #include <unistd.h>
21 #include <sys/types.h>
22 #include <pwd.h>
23 #include <stdlib.h>
24 */
25 import "C"
26
27 // A Reporter gathers statistics for a cgroup and writes them to a
28 // log.Logger.
29 type Reporter struct {
30         // CID of the container to monitor. If empty, read the CID
31         // from CIDFile.
32         CID string
33         // Where cgroup special files live on this system
34         CgroupRoot   string
35         CgroupParent string
36         // Path to a file we can read CID from. If CIDFile is empty or
37         // nonexistent, wait for it to appear.
38         CIDFile string
39
40         // Interval between samples
41         Poll time.Duration
42
43         // Where to write statistics.
44         Logger *log.Logger
45
46         reportedStatFile map[string]string
47         lastNetSample    map[string]IoSample
48         lastDiskSample   map[string]IoSample
49         lastCPUSample    CpuSample
50
51         done chan struct{}
52 }
53
54 // Wait (if necessary) for the CID to appear in CIDFile, then start
55 // reporting statistics.
56 //
57 // Start should not be called more than once on a Reporter.
58 //
59 // Public data fields should not be changed after calling Start.
60 func (r *Reporter) Start() {
61         r.done = make(chan struct{})
62         go r.run()
63 }
64
65 // Stop reporting statistics. Do not call more than once, or before
66 // calling Start.
67 //
68 // Nothing will be logged after Stop returns.
69 func (r *Reporter) Stop() {
70         close(r.done)
71 }
72
73 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
74         content, err := ioutil.ReadAll(in)
75         if err != nil {
76                 r.Logger.Print(err)
77         }
78         return content, err
79 }
80
81 // Open the cgroup stats file in /sys/fs corresponding to the target
82 // cgroup, and return an io.ReadCloser. If no stats file is available,
83 // return nil.
84 //
85 // Log the file that was opened, if it isn't the same file opened on
86 // the last openStatFile for this stat.
87 //
88 // Log "not available" if no file is found and either this stat has
89 // been available in the past, or verbose==true.
90 //
91 // TODO: Instead of trying all options, choose a process in the
92 // container, and read /proc/PID/cgroup to determine the appropriate
93 // cgroup root for the given statgroup. (This will avoid falling back
94 // to host-level stats during container setup and teardown.)
95 func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
96         var paths []string
97         if r.CID != "" {
98                 // Collect container's stats
99                 paths = []string{
100                         fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
101                         fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
102                 }
103         } else {
104                 // Collect this host's stats
105                 paths = []string{
106                         fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
107                         fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
108                 }
109         }
110         var path string
111         var file *os.File
112         var err error
113         for _, path = range paths {
114                 file, err = os.Open(path)
115                 if err == nil {
116                         break
117                 } else {
118                         path = ""
119                 }
120         }
121         if pathWas := r.reportedStatFile[stat]; pathWas != path {
122                 // Log whenever we start using a new/different cgroup
123                 // stat file for a given statistic. This typically
124                 // happens 1 to 3 times per statistic, depending on
125                 // whether we happen to collect stats [a] before any
126                 // processes have been created in the container and
127                 // [b] after all contained processes have exited.
128                 if path == "" && verbose {
129                         r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
130                 } else if pathWas != "" {
131                         r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
132                 } else {
133                         r.Logger.Printf("notice: reading stats from %s\n", path)
134                 }
135                 r.reportedStatFile[stat] = path
136         }
137         return file, err
138 }
139
140 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
141         procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
142         if err != nil {
143                 return nil, err
144         }
145         defer procsFile.Close()
146         reader := bufio.NewScanner(procsFile)
147         for reader.Scan() {
148                 taskPid := reader.Text()
149                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
150                 stats, err := ioutil.ReadFile(statsFilename)
151                 if err != nil {
152                         r.Logger.Print(err)
153                         continue
154                 }
155                 return strings.NewReader(string(stats)), nil
156         }
157         return nil, errors.New("Could not read stats for any proc in container")
158 }
159
160 type IoSample struct {
161         sampleTime time.Time
162         txBytes    int64
163         rxBytes    int64
164 }
165
166 func (r *Reporter) DoBlkIoStats() {
167         c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
168         if err != nil {
169                 return
170         }
171         defer c.Close()
172         b := bufio.NewScanner(c)
173         var sampleTime = time.Now()
174         newSamples := make(map[string]IoSample)
175         for b.Scan() {
176                 var device, op string
177                 var val int64
178                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
179                         continue
180                 }
181                 var thisSample IoSample
182                 var ok bool
183                 if thisSample, ok = newSamples[device]; !ok {
184                         thisSample = IoSample{sampleTime, -1, -1}
185                 }
186                 switch op {
187                 case "Read":
188                         thisSample.rxBytes = val
189                 case "Write":
190                         thisSample.txBytes = val
191                 }
192                 newSamples[device] = thisSample
193         }
194         for dev, sample := range newSamples {
195                 if sample.txBytes < 0 || sample.rxBytes < 0 {
196                         continue
197                 }
198                 delta := ""
199                 if prev, ok := r.lastDiskSample[dev]; ok {
200                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
201                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
202                                 sample.txBytes-prev.txBytes,
203                                 sample.rxBytes-prev.rxBytes)
204                 }
205                 r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
206                 r.lastDiskSample[dev] = sample
207         }
208 }
209
210 type MemSample struct {
211         sampleTime time.Time
212         memStat    map[string]int64
213 }
214
215 func (r *Reporter) DoMemoryStats() {
216         c, err := r.openStatFile("memory", "memory.stat", true)
217         if err != nil {
218                 return
219         }
220         defer c.Close()
221         b := bufio.NewScanner(c)
222         thisSample := MemSample{time.Now(), make(map[string]int64)}
223         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
224         for b.Scan() {
225                 var stat string
226                 var val int64
227                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
228                         continue
229                 }
230                 thisSample.memStat[stat] = val
231         }
232         var outstat bytes.Buffer
233         for _, key := range wantStats {
234                 if val, ok := thisSample.memStat[key]; ok {
235                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
236                 }
237         }
238         r.Logger.Printf("mem%s\n", outstat.String())
239 }
240
241 func (r *Reporter) DoNetworkStats() {
242         sampleTime := time.Now()
243         stats, err := r.getContainerNetStats()
244         if err != nil {
245                 return
246         }
247
248         scanner := bufio.NewScanner(stats)
249         for scanner.Scan() {
250                 var ifName string
251                 var rx, tx int64
252                 words := strings.Fields(scanner.Text())
253                 if len(words) != 17 {
254                         // Skip lines with wrong format
255                         continue
256                 }
257                 ifName = strings.TrimRight(words[0], ":")
258                 if ifName == "lo" || ifName == "" {
259                         // Skip loopback interface and lines with wrong format
260                         continue
261                 }
262                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
263                         continue
264                 }
265                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
266                         continue
267                 }
268                 nextSample := IoSample{}
269                 nextSample.sampleTime = sampleTime
270                 nextSample.txBytes = tx
271                 nextSample.rxBytes = rx
272                 var delta string
273                 if prev, ok := r.lastNetSample[ifName]; ok {
274                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
275                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
276                                 interval,
277                                 tx-prev.txBytes,
278                                 rx-prev.rxBytes)
279                 }
280                 r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
281                 r.lastNetSample[ifName] = nextSample
282         }
283 }
284
285 type CpuSample struct {
286         hasData    bool // to distinguish the zero value from real data
287         sampleTime time.Time
288         user       float64
289         sys        float64
290         cpus       int64
291 }
292
293 // Return the number of CPUs available in the container. Return 0 if
294 // we can't figure out the real number of CPUs.
295 func (r *Reporter) GetCpuCount() int64 {
296         cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
297         if err != nil {
298                 return 0
299         }
300         defer cpusetFile.Close()
301         b, err := r.readAllOrWarn(cpusetFile)
302         sp := strings.Split(string(b), ",")
303         cpus := int64(0)
304         for _, v := range sp {
305                 var min, max int64
306                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
307                 if n == 2 {
308                         cpus += (max - min) + 1
309                 } else {
310                         cpus += 1
311                 }
312         }
313         return cpus
314 }
315
316 func (r *Reporter) DoCpuStats() {
317         statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
318         if err != nil {
319                 return
320         }
321         defer statFile.Close()
322         b, err := r.readAllOrWarn(statFile)
323         if err != nil {
324                 return
325         }
326
327         nextSample := CpuSample{true, time.Now(), 0, 0, r.GetCpuCount()}
328         var userTicks, sysTicks int64
329         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
330         user_hz := float64(C.sysconf(C._SC_CLK_TCK))
331         nextSample.user = float64(userTicks) / user_hz
332         nextSample.sys = float64(sysTicks) / user_hz
333
334         delta := ""
335         if r.lastCPUSample.hasData {
336                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
337                         nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
338                         nextSample.user-r.lastCPUSample.user,
339                         nextSample.sys-r.lastCPUSample.sys)
340         }
341         r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
342                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
343         r.lastCPUSample = nextSample
344 }
345
346 // Report stats periodically until r.done indicates someone called
347 // Stop.
348 func (r *Reporter) run() {
349         r.reportedStatFile = make(map[string]string)
350
351         if !r.waitForCIDFile() || !r.waitForCgroup() {
352                 return
353         }
354
355         r.lastNetSample = make(map[string]IoSample)
356         r.lastDiskSample = make(map[string]IoSample)
357
358         ticker := time.NewTicker(r.Poll)
359         for {
360                 r.DoMemoryStats()
361                 r.DoCpuStats()
362                 r.DoBlkIoStats()
363                 r.DoNetworkStats()
364                 select {
365                 case <-r.done:
366                         return
367                 case <-ticker.C:
368                 }
369         }
370 }
371
372 // If CID is empty, wait for it to appear in CIDFile. Return true if
373 // we get it before r.done indicates someone called Stop.
374 func (r *Reporter) waitForCIDFile() bool {
375         if r.CID != "" {
376                 return true
377         }
378
379         ticker := time.NewTicker(100 * time.Millisecond)
380         defer ticker.Stop()
381         for {
382                 cid, err := ioutil.ReadFile(r.CIDFile)
383                 if err == nil && len(cid) > 0 {
384                         r.CID = string(cid)
385                         return true
386                 }
387                 select {
388                 case <-ticker.C:
389                 case <-r.done:
390                         r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
391                         return false
392                 }
393         }
394 }
395
396 // Wait for the cgroup stats files to appear in cgroup_root. Return
397 // true if they appear before r.done indicates someone called Stop. If
398 // they don't appear within one poll interval, log a warning and keep
399 // waiting.
400 func (r *Reporter) waitForCgroup() bool {
401         ticker := time.NewTicker(100 * time.Millisecond)
402         defer ticker.Stop()
403         warningTimer := time.After(r.Poll)
404         for {
405                 c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
406                 if err == nil {
407                         c.Close()
408                         return true
409                 }
410                 select {
411                 case <-ticker.C:
412                 case <-warningTimer:
413                         r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.Poll)
414                 case <-r.done:
415                         r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
416                         return false
417                 }
418         }
419 }