3826: Move cpu stats into a function, fix interval reporting.
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "flag"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "os/exec"
14         "os/signal"
15         "strings"
16         "syscall"
17         "time"
18 )
19
20 /*
21 #include <unistd.h>
22 #include <sys/types.h>
23 #include <pwd.h>
24 #include <stdlib.h>
25 */
26 import "C"
27 // The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
28
29 type Cgroup struct {
30         root   string
31         parent string
32         cid    string
33 }
34
35 func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) {
36         s := bufio.NewScanner(in)
37         for s.Scan() {
38                 out <- s.Text()
39         }
40         done <- true
41 }
42
43 func CopyChanToPipe(in <-chan string, out io.Writer) {
44         for s := range in {
45                 fmt.Fprintln(out, s)
46         }
47 }
48
49 func OpenAndReadAll(filename string, log_chan chan<- string) ([]byte, error) {
50         in, err := os.Open(filename)
51         if err != nil {
52                 if log_chan != nil {
53                         log_chan <- fmt.Sprintf("crunchstat: open %s: %s", filename, err)
54                 }
55                 return nil, err
56         }
57         defer in.Close()
58         {
59                 content, err := ioutil.ReadAll(in)
60                 if err != nil && log_chan != nil {
61                         log_chan <- fmt.Sprintf("crunchstat: read %s: %s", filename, err)
62                 }
63                 return content, err
64         }
65 }
66
67 var reportedStatFile map[string]bool
68 var reportedNoStatFile map[string]bool
69
70 // Find the cgroup stats file in /sys/fs corresponding to the target
71 // cgroup.
72 //
73 // TODO: Instead of trying all options, choose a process in the
74 // container, and read /proc/PID/cgroup to determine the appropriate
75 // cgroup root for the given statgroup. (This will avoid falling back
76 // to host-level stats during container setup and teardown.)
77 func FindStat(stderr chan<- string, cgroup Cgroup, statgroup string, stat string) string {
78         if reportedStatFile == nil {
79                 reportedStatFile = make(map[string]bool)
80                 reportedNoStatFile = make(map[string]bool)
81         }
82
83         var path string
84         path = fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat)
85         if _, err := os.Stat(path); err != nil {
86                 path = fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat)
87         }
88         if _, err := os.Stat(path); err != nil {
89                 path = fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat)
90         }
91         if _, err := os.Stat(path); err != nil {
92                 path = fmt.Sprintf("%s/%s", cgroup.root, stat)
93         }
94         if _, err := os.Stat(path); err != nil {
95                 if _, ok := reportedNoStatFile[stat]; !ok {
96                         stderr <- fmt.Sprintf("crunchstat: did not find stats file (root %s, parent %s, cid %s, statgroup %s, stat %s)", cgroup.root, cgroup.parent, cgroup.cid, statgroup, stat)
97                         reportedNoStatFile[stat] = true
98                 }
99                 return ""
100         }
101         if _, ok := reportedStatFile[path]; !ok {
102                 stderr <- fmt.Sprintf("crunchstat: reading stats from %s", path)
103                 reportedStatFile[path] = true
104         }
105         return path
106 }
107
108 func GetContainerNetStats(stderr chan<- string, cgroup Cgroup) (io.Reader, error) {
109         procsFilename := FindStat(stderr, cgroup, "cpuacct", "cgroup.procs")
110         procsFile, err := os.Open(procsFilename)
111         if err != nil {
112                 stderr <- fmt.Sprintf("crunchstat: open %s: %s", procsFilename, err)
113                 return nil, err
114         }
115         defer procsFile.Close()
116         reader := bufio.NewScanner(procsFile)
117         for reader.Scan() {
118                 taskPid := reader.Text()
119                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
120                 stats, err := OpenAndReadAll(statsFilename, stderr)
121                 if err != nil {
122                         continue
123                 }
124                 return strings.NewReader(string(stats)), nil
125         }
126         return nil, errors.New("Could not read stats for any proc in container")
127 }
128
129 type IoSample struct {
130         sampleTime time.Time
131         txBytes    int64
132         rxBytes    int64
133 }
134
135 func DoBlkIoStats(stderr chan<- string, cgroup Cgroup, lastSample map[string]IoSample) (map[string]IoSample) {
136         blkio_io_service_bytes := FindStat(stderr, cgroup, "blkio", "blkio.io_service_bytes")
137         if blkio_io_service_bytes == "" {
138                 return lastSample
139         }
140
141         c, err := os.Open(blkio_io_service_bytes)
142         if err != nil {
143                 stderr <- fmt.Sprintf("crunchstat: open %s: %s", blkio_io_service_bytes, err)
144                 return lastSample
145         }
146         defer c.Close()
147         b := bufio.NewScanner(c)
148         var sampleTime = time.Now()
149         newSamples := make(map[string]IoSample)
150         for b.Scan() {
151                 var device, op string
152                 var val int64
153                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
154                         continue
155                 }
156                 var thisSample IoSample
157                 var ok bool
158                 if thisSample, ok = newSamples[device]; !ok {
159                         thisSample = IoSample{sampleTime, -1, -1}
160                 }
161                 switch op {
162                 case "Read":
163                         thisSample.rxBytes = val
164                 case "Write":
165                         thisSample.txBytes = val
166                 }
167                 newSamples[device] = thisSample
168         }
169         if lastSample == nil {
170                 lastSample = make(map[string]IoSample)
171         }
172         for dev, sample := range newSamples {
173                 if sample.txBytes < 0 || sample.rxBytes < 0 {
174                         continue
175                 }
176                 delta := ""
177                 if prev, ok := lastSample[dev]; ok {
178                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
179                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
180                                 sample.txBytes - prev.txBytes,
181                                 sample.rxBytes - prev.rxBytes)
182                 }
183                 stderr <- fmt.Sprintf("crunchstat: blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
184                 lastSample[dev] = sample
185         }
186         return lastSample
187 }
188
189 type MemSample struct {
190         sampleTime time.Time
191         memStat    map[string]int64
192 }
193
194 func DoMemoryStats(stderr chan<- string, cgroup Cgroup) {
195         memory_stat := FindStat(stderr, cgroup, "memory", "memory.stat")
196         if memory_stat == "" {
197                 return
198         }
199         c, err := os.Open(memory_stat)
200         if err != nil {
201                 stderr <- fmt.Sprintf("crunchstat: open %s: %s", memory_stat, err)
202                 return
203         }
204         defer c.Close()
205         b := bufio.NewScanner(c)
206         thisSample := MemSample{time.Now(), make(map[string]int64)}
207         wantStats := [...]string{"cache", "pgmajfault", "rss"}
208         for b.Scan() {
209                 var stat string
210                 var val int64
211                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
212                         continue
213                 }
214                 thisSample.memStat[stat] = val
215         }
216         var outstat bytes.Buffer
217         for _, key := range wantStats {
218                 if val, ok := thisSample.memStat[key]; ok {
219                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
220                 }
221         }
222         stderr <- fmt.Sprintf("crunchstat: mem%s", outstat.String())
223 }
224
225 func DoNetworkStats(stderr chan<- string, cgroup Cgroup, lastSample map[string]IoSample) (map[string]IoSample) {
226         sampleTime := time.Now()
227         stats, err := GetContainerNetStats(stderr, cgroup)
228         if err != nil { return lastSample }
229
230         if lastSample == nil {
231                 lastSample = make(map[string]IoSample)
232         }
233         scanner := bufio.NewScanner(stats)
234         Iface: for scanner.Scan() {
235                 var ifName string
236                 var rx, tx int64
237                 words := bufio.NewScanner(strings.NewReader(scanner.Text()))
238                 words.Split(bufio.ScanWords)
239                 wordIndex := 0
240                 for words.Scan() {
241                         word := words.Text()
242                         switch wordIndex {
243                         case 0:
244                                 ifName = strings.TrimRight(word, ":")
245                         case 1:
246                                 if _, err := fmt.Sscanf(word, "%d", &rx); err != nil {
247                                         continue Iface
248                                 }
249                         case 9:
250                                 if _, err := fmt.Sscanf(word, "%d", &tx); err != nil {
251                                         continue Iface
252                                 }
253                         }
254                         wordIndex++
255                 }
256                 if ifName == "lo" || ifName == "" || wordIndex != 17 {
257                         // Skip loopback interface and lines with wrong format
258                         continue
259                 }
260                 nextSample := IoSample{}
261                 nextSample.sampleTime = sampleTime
262                 nextSample.txBytes = tx
263                 nextSample.rxBytes = rx
264                 var delta string
265                 if lastSample, ok := lastSample[ifName]; ok {
266                         interval := nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds()
267                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
268                                 interval,
269                                 tx - lastSample.txBytes,
270                                 rx - lastSample.rxBytes)
271                 }
272                 stderr <- fmt.Sprintf("crunchstat: net:%s %d tx %d rx%s",
273                         ifName, tx, rx, delta)
274                 lastSample[ifName] = nextSample
275         }
276         return lastSample
277 }
278
279 type CpuSample struct {
280         sampleTime time.Time
281         user       float64
282         sys        float64
283         cpus       int64
284 }
285
286 func DoCpuStats(stderr chan<- string, cgroup Cgroup, cpus int64, user_hz float64, lastSample *CpuSample) (*CpuSample) {
287         cpuacct_stat := FindStat(stderr, cgroup, "cpuacct", "cpuacct.stat")
288         if cpuacct_stat == "" {
289                 return lastSample
290         }
291         b, err := OpenAndReadAll(cpuacct_stat, stderr)
292         if err != nil {
293                 return lastSample
294         }
295         nextSample := &CpuSample{time.Now(), 0, 0, cpus}
296         var userTicks, sysTicks int64
297         fmt.Sscanf(string(b), "user %d\nsystem %d", &nextSample.user, &nextSample.sys)
298         nextSample.user = float64(userTicks) / user_hz
299         nextSample.sys = float64(sysTicks) / user_hz
300
301         delta := ""
302         if lastSample != nil {
303                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
304                         nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
305                         nextSample.user - lastSample.user,
306                         nextSample.sys - lastSample.sys)
307         }
308         stderr <- fmt.Sprintf("crunchstat: cpu %.4f user %.4f sys %d cpus%s",
309                 nextSample.user, nextSample.sys, cpus, delta)
310         return nextSample
311 }
312
313 func PollCgroupStats(cgroup Cgroup, stderr chan string, poll int64, stop_poll_chan <-chan bool) {
314         var last_cpucount int64 = 0
315
316         user_hz := float64(C.sysconf(C._SC_CLK_TCK))
317
318         var lastNetSample map[string]IoSample = nil
319         var lastDiskSample map[string]IoSample = nil
320         var lastCpuSample *CpuSample = nil
321
322         poll_chan := make(chan bool, 1)
323         go func() {
324                 // Send periodic poll events.
325                 poll_chan <- true
326                 for {
327                         time.Sleep(time.Duration(poll) * time.Millisecond)
328                         poll_chan <- true
329                 }
330         }()
331         for {
332                 select {
333                 case <-stop_poll_chan:
334                         return
335                 case <-poll_chan:
336                         // Emit stats, then select again.
337                 }
338                 cpuset_cpus := FindStat(stderr, cgroup, "cpuset", "cpuset.cpus")
339                 if cpuset_cpus != "" {
340                         b, err := OpenAndReadAll(cpuset_cpus, stderr)
341                         if err != nil {
342                                 // cgroup probably gone -- skip other stats too.
343                                 continue
344                         }
345                         sp := strings.Split(string(b), ",")
346                         cpus := int64(0)
347                         for _, v := range sp {
348                                 var min, max int64
349                                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
350                                 if n == 2 {
351                                         cpus += (max - min) + 1
352                                 } else {
353                                         cpus += 1
354                                 }
355                         }
356                         last_cpucount = cpus
357                 }
358
359                 DoMemoryStats(stderr, cgroup)
360                 lastCpuSample = DoCpuStats(stderr, cgroup, last_cpucount, user_hz, lastCpuSample)
361                 lastDiskSample = DoBlkIoStats(stderr, cgroup, lastDiskSample)
362                 lastNetSample = DoNetworkStats(stderr, cgroup, lastNetSample)
363         }
364 }
365
366 func run(logger *log.Logger) error {
367
368         var (
369                 cgroup_root    string
370                 cgroup_parent  string
371                 cgroup_cidfile string
372                 wait           int64
373                 poll           int64
374         )
375
376         flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
377         flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
378         flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
379         flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
380         flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
381
382         flag.Parse()
383
384         if cgroup_root == "" {
385                 logger.Fatal("Must provide -cgroup-root")
386         }
387
388         stderr_chan := make(chan string, 1)
389         defer close(stderr_chan)
390         finish_chan := make(chan bool)
391         defer close(finish_chan)
392
393         go CopyChanToPipe(stderr_chan, os.Stderr)
394
395         var cmd *exec.Cmd
396
397         if len(flag.Args()) > 0 {
398                 // Set up subprocess
399                 cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
400
401                 logger.Print("Running ", flag.Args())
402
403                 // Child process will use our stdin and stdout pipes
404                 // (we close our copies below)
405                 cmd.Stdin = os.Stdin
406                 cmd.Stdout = os.Stdout
407
408                 // Forward SIGINT and SIGTERM to inner process
409                 term := make(chan os.Signal, 1)
410                 go func(sig <-chan os.Signal) {
411                         catch := <-sig
412                         if cmd.Process != nil {
413                                 cmd.Process.Signal(catch)
414                         }
415                         logger.Print("caught signal: ", catch)
416                 }(term)
417                 signal.Notify(term, syscall.SIGTERM)
418                 signal.Notify(term, syscall.SIGINT)
419
420                 // Funnel stderr through our channel
421                 stderr_pipe, err := cmd.StderrPipe()
422                 if err != nil {
423                         logger.Fatal(err)
424                 }
425                 go CopyPipeToChan(stderr_pipe, stderr_chan, finish_chan)
426
427                 // Run subprocess
428                 if err := cmd.Start(); err != nil {
429                         logger.Fatal(err)
430                 }
431
432                 // Close stdin/stdout in this (parent) process
433                 os.Stdin.Close()
434                 os.Stdout.Close()
435         }
436
437         // Read the cid file
438         var container_id string
439         if cgroup_cidfile != "" {
440                 // wait up to 'wait' seconds for the cid file to appear
441                 ok := false
442                 var i time.Duration
443                 for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
444                         cid, err := OpenAndReadAll(cgroup_cidfile, nil)
445                         if err == nil && len(cid) > 0 {
446                                 ok = true
447                                 container_id = string(cid)
448                                 break
449                         }
450                         time.Sleep(100 * time.Millisecond)
451                 }
452                 if !ok {
453                         logger.Printf("Could not read cid file %s", cgroup_cidfile)
454                 }
455         }
456
457         stop_poll_chan := make(chan bool, 1)
458         cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
459         go PollCgroupStats(cgroup, stderr_chan, poll, stop_poll_chan)
460
461         // When the child exits, tell the polling goroutine to stop.
462         defer func() { stop_poll_chan <- true }()
463
464         // Wait for CopyPipeToChan to consume child's stderr pipe
465         <-finish_chan
466
467         return cmd.Wait()
468 }
469
470 func main() {
471         logger := log.New(os.Stderr, "crunchstat: ", 0)
472         if err := run(logger); err != nil {
473                 if exiterr, ok := err.(*exec.ExitError); ok {
474                         // The program has exited with an exit code != 0
475
476                         // This works on both Unix and
477                         // Windows. Although package syscall is
478                         // generally platform dependent, WaitStatus is
479                         // defined for both Unix and Windows and in
480                         // both cases has an ExitStatus() method with
481                         // the same signature.
482                         if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
483                                 os.Exit(status.ExitStatus())
484                         }
485                 } else {
486                         logger.Fatalf("cmd.Wait: %v", err)
487                 }
488         }
489 }