3826: Use /proc/PID/net/dev to get container net stats.
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "flag"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "os/exec"
13         "os/signal"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 /*
20 #include <unistd.h>
21 #include <sys/types.h>
22 #include <pwd.h>
23 #include <stdlib.h>
24 */
25 import "C"
26 // The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
27
28 type Cgroup struct {
29         root   string
30         parent string
31         cid    string
32 }
33
34 func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) {
35         s := bufio.NewScanner(in)
36         for s.Scan() {
37                 out <- s.Text()
38         }
39         done <- true
40 }
41
42 func CopyChanToPipe(in <-chan string, out io.Writer) {
43         for s := range in {
44                 fmt.Fprintln(out, s)
45         }
46 }
47
48 func OpenAndReadAll(filename string, log_chan chan<- string) ([]byte, error) {
49         in, err := os.Open(filename)
50         if err != nil {
51                 if log_chan != nil {
52                         log_chan <- fmt.Sprintf("open %s: %s", filename, err)
53                 }
54                 return nil, err
55         }
56         defer in.Close()
57         {
58                 content, err := ioutil.ReadAll(in)
59                 if err != nil && log_chan != nil {
60                         log_chan <- fmt.Sprintf("read %s: %s", filename, err)
61                 }
62                 return content, err
63         }
64 }
65
66 func FindStat(stderr chan<- string, cgroup Cgroup, statgroup string, stat string, verbose bool) string {
67         var path string
68         path = fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat)
69         if _, err := os.Stat(path); err != nil {
70                 path = fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat)
71         }
72         if _, err := os.Stat(path); err != nil {
73                 path = fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat)
74         }
75         if _, err := os.Stat(path); err != nil {
76                 path = fmt.Sprintf("%s/%s", cgroup.root, stat)
77         }
78         if _, err := os.Stat(path); err != nil {
79                 stderr <- fmt.Sprintf("crunchstat: did not find stats file (root %s, parent %s, cid %s, statgroup %s, stat %s)", cgroup.root, cgroup.parent, cgroup.cid, statgroup, stat)
80                 return ""
81         }
82         if verbose {
83                 stderr <- fmt.Sprintf("crunchstat: reading stats from %s", path)
84         }
85         return path
86 }
87
88 func GetContainerNetStats(stderr chan<- string, cgroup Cgroup) (io.Reader, error) {
89         procsFilename := FindStat(stderr, cgroup, "cpuacct", "cgroup.procs", false)
90         procsFile, err := os.Open(procsFilename)
91         if err != nil {
92                 stderr <- fmt.Sprintf("crunchstat: open %s: %s", procsFilename, err)
93                 return nil, err
94         }
95         defer procsFile.Close()
96         reader := bufio.NewScanner(procsFile)
97         for reader.Scan() {
98                 taskPid := reader.Text()
99                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
100                 stats, err := OpenAndReadAll(statsFilename, stderr)
101                 if err != nil {
102                         stderr <- fmt.Sprintf("crunchstat: open %s: %s", statsFilename, err)
103                         continue
104                 }
105                 return strings.NewReader(string(stats)), nil
106         }
107         return nil, errors.New("Could not read stats for any proc in container")
108 }
109
110 type NetSample struct {
111         sampleTime time.Time
112         txBytes    int64
113         rxBytes    int64
114 }
115
116 func DoNetworkStats(stderr chan<- string, cgroup Cgroup, lastStat map[string]NetSample) (map[string]NetSample) {
117         stats, err := GetContainerNetStats(stderr, cgroup)
118         if err != nil { return lastStat }
119
120         if lastStat == nil {
121                 lastStat = make(map[string]NetSample)
122         }
123         scanner := bufio.NewScanner(stats)
124         Iface: for scanner.Scan() {
125                 var ifName string
126                 var rx, tx int64
127                 words := bufio.NewScanner(strings.NewReader(scanner.Text()))
128                 words.Split(bufio.ScanWords)
129                 wordIndex := 0
130                 for words.Scan() {
131                         word := words.Text()
132                         switch wordIndex {
133                         case 0:
134                                 ifName = strings.TrimRight(word, ":")
135                         case 1:
136                                 if _, err := fmt.Sscanf(word, "%d", &rx); err != nil {
137                                         continue Iface
138                                 }
139                         case 9:
140                                 if _, err := fmt.Sscanf(word, "%d", &tx); err != nil {
141                                         continue Iface
142                                 }
143                         }
144                         wordIndex++
145                 }
146                 if ifName == "lo" || ifName == "" { continue }
147                 nextSample := NetSample{}
148                 nextSample.sampleTime = time.Now()
149                 nextSample.txBytes = tx
150                 nextSample.rxBytes = rx
151                 if lastSample, ok := lastStat[ifName]; ok {
152                         stderr <- fmt.Sprintf("crunchstat: task net %s tx %d rx %d interval %.4f",
153                                 ifName,
154                                 nextSample.txBytes - lastSample.txBytes,
155                                 nextSample.rxBytes - lastSample.rxBytes,
156                                 nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds())
157                 }
158                 lastStat[ifName] = nextSample
159         }
160         return lastStat
161 }
162
163 func PollCgroupStats(cgroup Cgroup, stderr chan string, poll int64, stop_poll_chan <-chan bool) {
164         var last_user int64 = -1
165         var last_sys int64 = -1
166         var last_cpucount int64 = 0
167
168         type Disk struct {
169                 last_read  int64
170                 next_read  int64
171                 last_write int64
172                 next_write int64
173         }
174
175         disk := make(map[string]*Disk)
176
177         user_hz := float64(C.sysconf(C._SC_CLK_TCK))
178
179         cpuacct_stat := FindStat(stderr, cgroup, "cpuacct", "cpuacct.stat", true)
180         blkio_io_service_bytes := FindStat(stderr, cgroup, "blkio", "blkio.io_service_bytes", true)
181         cpuset_cpus := FindStat(stderr, cgroup, "cpuset", "cpuset.cpus", true)
182         memory_stat := FindStat(stderr, cgroup, "memory", "memory.stat", true)
183         lastNetStat := DoNetworkStats(stderr, cgroup, nil)
184
185         poll_chan := make(chan bool, 1)
186         go func() {
187                 // Send periodic poll events.
188                 poll_chan <- true
189                 for {
190                         time.Sleep(time.Duration(poll) * time.Millisecond)
191                         poll_chan <- true
192                 }
193         }()
194         for {
195                 bedtime := time.Now()
196                 select {
197                 case <-stop_poll_chan:
198                         return
199                 case <-poll_chan:
200                         // Emit stats, then select again.
201                 }
202                 morning := time.Now()
203                 elapsed := morning.Sub(bedtime).Seconds()
204                 if cpuset_cpus != "" {
205                         b, err := OpenAndReadAll(cpuset_cpus, stderr)
206                         if err != nil {
207                                 // cgroup probably gone -- skip other stats too.
208                                 continue
209                         }
210                         sp := strings.Split(string(b), ",")
211                         cpus := int64(0)
212                         for _, v := range sp {
213                                 var min, max int64
214                                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
215                                 if n == 2 {
216                                         cpus += (max - min) + 1
217                                 } else {
218                                         cpus += 1
219                                 }
220                         }
221                         last_cpucount = cpus
222                 }
223                 if cpuacct_stat != "" {
224                         b, err := OpenAndReadAll(cpuacct_stat, stderr)
225                         if err != nil {
226                                 // Next time around, last_user would
227                                 // be >1 interval old, so stats will
228                                 // be incorrect. Start over instead.
229                                 last_user = -1
230
231                                 // cgroup probably gone -- skip other stats too.
232                                 continue
233                         }
234                         var next_user int64
235                         var next_sys int64
236                         fmt.Sscanf(string(b), "user %d\nsystem %d", &next_user, &next_sys)
237
238                         if elapsed > 0 && last_user != -1 {
239                                 user_diff := next_user - last_user
240                                 sys_diff := next_sys - last_sys
241                                 // {*_diff} == {1/user_hz}-second
242                                 // ticks of CPU core consumed in an
243                                 // {elapsed}-second interval.
244                                 //
245                                 // We report this as CPU core usage
246                                 // (i.e., 1.0 == one pegged core). We
247                                 // also report the number of cores
248                                 // (maximum possible usage).
249                                 user := float64(user_diff) / elapsed / user_hz
250                                 sys := float64(sys_diff) / elapsed / user_hz
251
252                                 stderr <- fmt.Sprintf("crunchstat: cpuacct.stat user %.4f sys %.4f cpus %d interval %.4f", user, sys, last_cpucount, elapsed)
253                         }
254
255                         last_user = next_user
256                         last_sys = next_sys
257                 }
258                 if blkio_io_service_bytes != "" {
259                         c, err := os.Open(blkio_io_service_bytes)
260                         if err != nil {
261                                 stderr <- fmt.Sprintf("open %s: %s", blkio_io_service_bytes, err)
262                                 // cgroup probably gone -- skip other stats too.
263                                 continue
264                         }
265                         defer c.Close()
266                         b := bufio.NewScanner(c)
267                         var device, op string
268                         var next int64
269                         for b.Scan() {
270                                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &next); err != nil {
271                                         continue
272                                 }
273                                 if disk[device] == nil {
274                                         disk[device] = new(Disk)
275                                 }
276                                 if op == "Read" {
277                                         disk[device].last_read = disk[device].next_read
278                                         disk[device].next_read = next
279                                         if disk[device].last_read > 0 && (disk[device].next_read != disk[device].last_read) {
280                                                 stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s read %v", device, disk[device].next_read-disk[device].last_read)
281                                         }
282                                 }
283                                 if op == "Write" {
284                                         disk[device].last_write = disk[device].next_write
285                                         disk[device].next_write = next
286                                         if disk[device].last_write > 0 && (disk[device].next_write != disk[device].last_write) {
287                                                 stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s write %v", device, disk[device].next_write-disk[device].last_write)
288                                         }
289                                 }
290                         }
291                 }
292
293                 if memory_stat != "" {
294                         c, err := os.Open(memory_stat)
295                         if err != nil {
296                                 stderr <- fmt.Sprintf("open %s: %s", memory_stat, err)
297                                 // cgroup probably gone -- skip other stats too.
298                                 continue
299                         }
300                         b := bufio.NewScanner(c)
301                         var stat string
302                         var val int64
303                         for b.Scan() {
304                                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err == nil {
305                                         if stat == "rss" {
306                                                 stderr <- fmt.Sprintf("crunchstat: memory.stat rss %v", val)
307                                         }
308                                 }
309                         }
310                         c.Close()
311                 }
312
313                 lastNetStat = DoNetworkStats(stderr, cgroup, lastNetStat)
314         }
315 }
316
317 func run(logger *log.Logger) error {
318
319         var (
320                 cgroup_root    string
321                 cgroup_parent  string
322                 cgroup_cidfile string
323                 wait           int64
324                 poll           int64
325         )
326
327         flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
328         flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
329         flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
330         flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
331         flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
332
333         flag.Parse()
334
335         if cgroup_root == "" {
336                 logger.Fatal("Must provide -cgroup-root")
337         }
338
339         stderr_chan := make(chan string, 1)
340         defer close(stderr_chan)
341         finish_chan := make(chan bool)
342         defer close(finish_chan)
343
344         go CopyChanToPipe(stderr_chan, os.Stderr)
345
346         var cmd *exec.Cmd
347
348         if len(flag.Args()) > 0 {
349                 // Set up subprocess
350                 cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
351
352                 logger.Print("Running ", flag.Args())
353
354                 // Child process will use our stdin and stdout pipes
355                 // (we close our copies below)
356                 cmd.Stdin = os.Stdin
357                 cmd.Stdout = os.Stdout
358
359                 // Forward SIGINT and SIGTERM to inner process
360                 term := make(chan os.Signal, 1)
361                 go func(sig <-chan os.Signal) {
362                         catch := <-sig
363                         if cmd.Process != nil {
364                                 cmd.Process.Signal(catch)
365                         }
366                         logger.Print("caught signal: ", catch)
367                 }(term)
368                 signal.Notify(term, syscall.SIGTERM)
369                 signal.Notify(term, syscall.SIGINT)
370
371                 // Funnel stderr through our channel
372                 stderr_pipe, err := cmd.StderrPipe()
373                 if err != nil {
374                         logger.Fatal(err)
375                 }
376                 go CopyPipeToChan(stderr_pipe, stderr_chan, finish_chan)
377
378                 // Run subprocess
379                 if err := cmd.Start(); err != nil {
380                         logger.Fatal(err)
381                 }
382
383                 // Close stdin/stdout in this (parent) process
384                 os.Stdin.Close()
385                 os.Stdout.Close()
386         }
387
388         // Read the cid file
389         var container_id string
390         if cgroup_cidfile != "" {
391                 // wait up to 'wait' seconds for the cid file to appear
392                 ok := false
393                 var i time.Duration
394                 for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
395                         cid, err := OpenAndReadAll(cgroup_cidfile, nil)
396                         if err == nil && len(cid) > 0 {
397                                 ok = true
398                                 container_id = string(cid)
399                                 break
400                         }
401                         time.Sleep(100 * time.Millisecond)
402                 }
403                 if !ok {
404                         logger.Printf("Could not read cid file %s", cgroup_cidfile)
405                 }
406         }
407
408         stop_poll_chan := make(chan bool, 1)
409         cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
410         go PollCgroupStats(cgroup, stderr_chan, poll, stop_poll_chan)
411
412         // When the child exits, tell the polling goroutine to stop.
413         defer func() { stop_poll_chan <- true }()
414
415         // Wait for CopyPipeToChan to consume child's stderr pipe
416         <-finish_chan
417
418         return cmd.Wait()
419 }
420
421 func main() {
422         logger := log.New(os.Stderr, "crunchstat: ", 0)
423         if err := run(logger); err != nil {
424                 if exiterr, ok := err.(*exec.ExitError); ok {
425                         // The program has exited with an exit code != 0
426
427                         // This works on both Unix and
428                         // Windows. Although package syscall is
429                         // generally platform dependent, WaitStatus is
430                         // defined for both Unix and Windows and in
431                         // both cases has an ExitStatus() method with
432                         // the same signature.
433                         if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
434                                 os.Exit(status.ExitStatus())
435                         }
436                 } else {
437                         logger.Fatalf("cmd.Wait: %v", err)
438                 }
439         }
440 }