3826: Merge branch 'master' into 3826-crunchstat-netstats
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "os/exec"
14         "os/signal"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 /*
22 #include <unistd.h>
23 #include <sys/types.h>
24 #include <pwd.h>
25 #include <stdlib.h>
26 */
27 import "C"
28
29 // The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
30
31 type Cgroup struct {
32         root   string
33         parent string
34         cid    string
35 }
36
37 func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) {
38         s := bufio.NewScanner(in)
39         for s.Scan() {
40                 out <- s.Text()
41         }
42         done <- true
43 }
44
45 func CopyChanToPipe(in <-chan string, out io.Writer) {
46         for s := range in {
47                 fmt.Fprintln(out, s)
48         }
49 }
50
51 var logChan chan string
52 func LogPrintf(format string, args ...interface{}) {
53         if logChan == nil {
54                 return
55         }
56         logChan <- fmt.Sprintf("crunchstat: " + format, args...)
57 }
58
59 func ReadAllOrWarn(in *os.File) ([]byte, error) {
60         content, err := ioutil.ReadAll(in)
61         if err != nil {
62                 LogPrintf("read %s: %s", in.Name(), err)
63         }
64         return content, err
65 }
66
67 var reportedStatFile = map[string]string{}
68
69 // Open the cgroup stats file in /sys/fs corresponding to the target
70 // cgroup, and return an *os.File. If no stats file is available,
71 // return nil.
72 //
73 // TODO: Instead of trying all options, choose a process in the
74 // container, and read /proc/PID/cgroup to determine the appropriate
75 // cgroup root for the given statgroup. (This will avoid falling back
76 // to host-level stats during container setup and teardown.)
77 func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
78         var paths = []string{
79                 fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
80                 fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
81                 fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
82                 fmt.Sprintf("%s/%s", cgroup.root, stat),
83         }
84         var path string
85         var file *os.File
86         var err error
87         for _, path = range paths {
88                 file, err = os.Open(path)
89                 if err == nil {
90                         break
91                 } else {
92                         path = ""
93                 }
94         }
95         if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
96                 // Log whenever we start using a new/different cgroup
97                 // stat file for a given statistic. This typically
98                 // happens 1 to 3 times per statistic, depending on
99                 // whether we happen to collect stats [a] before any
100                 // processes have been created in the container and
101                 // [b] after all contained processes have exited.
102                 reportedStatFile[stat] = path
103                 if path == "" {
104                         LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
105                 } else {
106                         LogPrintf("reading stats from %s", path)
107                 }
108         }
109         return file, err
110 }
111
112 func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
113         procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
114         if err != nil {
115                 return nil, err
116         }
117         defer procsFile.Close()
118         reader := bufio.NewScanner(procsFile)
119         for reader.Scan() {
120                 taskPid := reader.Text()
121                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
122                 stats, err := ioutil.ReadFile(statsFilename)
123                 if err != nil {
124                         LogPrintf("read %s: %s", statsFilename, err)
125                         continue
126                 }
127                 return strings.NewReader(string(stats)), nil
128         }
129         return nil, errors.New("Could not read stats for any proc in container")
130 }
131
132 type IoSample struct {
133         sampleTime time.Time
134         txBytes    int64
135         rxBytes    int64
136 }
137
138 func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
139         c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
140         if err != nil {
141                 return
142         }
143         defer c.Close()
144         b := bufio.NewScanner(c)
145         var sampleTime = time.Now()
146         newSamples := make(map[string]IoSample)
147         for b.Scan() {
148                 var device, op string
149                 var val int64
150                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
151                         continue
152                 }
153                 var thisSample IoSample
154                 var ok bool
155                 if thisSample, ok = newSamples[device]; !ok {
156                         thisSample = IoSample{sampleTime, -1, -1}
157                 }
158                 switch op {
159                 case "Read":
160                         thisSample.rxBytes = val
161                 case "Write":
162                         thisSample.txBytes = val
163                 }
164                 newSamples[device] = thisSample
165         }
166         for dev, sample := range newSamples {
167                 if sample.txBytes < 0 || sample.rxBytes < 0 {
168                         continue
169                 }
170                 delta := ""
171                 if prev, ok := lastSample[dev]; ok {
172                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
173                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
174                                 sample.txBytes-prev.txBytes,
175                                 sample.rxBytes-prev.rxBytes)
176                 }
177                 LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
178                 lastSample[dev] = sample
179         }
180 }
181
182 type MemSample struct {
183         sampleTime time.Time
184         memStat    map[string]int64
185 }
186
187 func DoMemoryStats(cgroup Cgroup) {
188         c, err := OpenStatFile(cgroup, "memory", "memory.stat")
189         if err != nil {
190                 return
191         }
192         defer c.Close()
193         b := bufio.NewScanner(c)
194         thisSample := MemSample{time.Now(), make(map[string]int64)}
195         wantStats := [...]string{"cache", "pgmajfault", "rss"}
196         for b.Scan() {
197                 var stat string
198                 var val int64
199                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
200                         continue
201                 }
202                 thisSample.memStat[stat] = val
203         }
204         var outstat bytes.Buffer
205         for _, key := range wantStats {
206                 if val, ok := thisSample.memStat[key]; ok {
207                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
208                 }
209         }
210         LogPrintf("mem%s", outstat.String())
211 }
212
213 func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
214         sampleTime := time.Now()
215         stats, err := GetContainerNetStats(cgroup)
216         if err != nil {
217                 return
218         }
219
220         scanner := bufio.NewScanner(stats)
221         for scanner.Scan() {
222                 var ifName string
223                 var rx, tx int64
224                 words := strings.Fields(scanner.Text())
225                 if len(words) != 17 {
226                         // Skip lines with wrong format
227                         continue
228                 }
229                 ifName = strings.TrimRight(words[0], ":")
230                 if ifName == "lo" || ifName == "" {
231                         // Skip loopback interface and lines with wrong format
232                         continue
233                 }
234                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
235                         continue
236                 }
237                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
238                         continue
239                 }
240                 nextSample := IoSample{}
241                 nextSample.sampleTime = sampleTime
242                 nextSample.txBytes = tx
243                 nextSample.rxBytes = rx
244                 var delta string
245                 if prev, ok := lastSample[ifName]; ok {
246                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
247                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
248                                 interval,
249                                 tx-prev.txBytes,
250                                 rx-prev.rxBytes)
251                 }
252                 LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
253                 lastSample[ifName] = nextSample
254         }
255 }
256
257 type CpuSample struct {
258         hasData    bool // to distinguish the zero value from real data
259         sampleTime time.Time
260         user       float64
261         sys        float64
262         cpus       int64
263 }
264
265 // Return the number of CPUs available in the container. Return 0 if
266 // we can't figure out the real number of CPUs.
267 func GetCpuCount(cgroup Cgroup) int64 {
268         cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
269         if err != nil {
270                 return 0
271         }
272         defer cpusetFile.Close()
273         b, err := ReadAllOrWarn(cpusetFile)
274         sp := strings.Split(string(b), ",")
275         cpus := int64(0)
276         for _, v := range sp {
277                 var min, max int64
278                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
279                 if n == 2 {
280                         cpus += (max - min) + 1
281                 } else {
282                         cpus += 1
283                 }
284         }
285         return cpus
286 }
287
288 func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
289         statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
290         if err != nil {
291                 return
292         }
293         defer statFile.Close()
294         b, err := ReadAllOrWarn(statFile)
295         if err != nil {
296                 return
297         }
298
299         nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
300         var userTicks, sysTicks int64
301         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
302         user_hz := float64(C.sysconf(C._SC_CLK_TCK))
303         nextSample.user = float64(userTicks) / user_hz
304         nextSample.sys = float64(sysTicks) / user_hz
305
306         delta := ""
307         if lastSample.hasData {
308                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
309                         nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
310                         nextSample.user-lastSample.user,
311                         nextSample.sys-lastSample.sys)
312         }
313         LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
314                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
315         *lastSample = nextSample
316 }
317
318 func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
319         var lastNetSample = map[string]IoSample{}
320         var lastDiskSample = map[string]IoSample{}
321         var lastCpuSample = CpuSample{}
322
323         poll_chan := make(chan bool, 1)
324         go func() {
325                 // Send periodic poll events.
326                 poll_chan <- true
327                 for {
328                         time.Sleep(time.Duration(poll) * time.Millisecond)
329                         poll_chan <- true
330                 }
331         }()
332         for {
333                 select {
334                 case <-stop_poll_chan:
335                         return
336                 case <-poll_chan:
337                         // Emit stats, then select again.
338                 }
339                 DoMemoryStats(cgroup)
340                 DoCpuStats(cgroup, &lastCpuSample)
341                 DoBlkIoStats(cgroup, lastDiskSample)
342                 DoNetworkStats(cgroup, lastNetSample)
343         }
344 }
345
346 func run(logger *log.Logger) error {
347
348         var (
349                 cgroup_root    string
350                 cgroup_parent  string
351                 cgroup_cidfile string
352                 wait           int64
353                 poll           int64
354         )
355
356         flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
357         flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
358         flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
359         flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
360         flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
361
362         flag.Parse()
363
364         if cgroup_root == "" {
365                 logger.Fatal("Must provide -cgroup-root")
366         }
367
368         logChan = make(chan string, 1)
369         defer close(logChan)
370         finish_chan := make(chan bool)
371         defer close(finish_chan)
372
373         go CopyChanToPipe(logChan, os.Stderr)
374
375         var cmd *exec.Cmd
376
377         if len(flag.Args()) > 0 {
378                 // Set up subprocess
379                 cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
380
381                 logger.Print("Running ", flag.Args())
382
383                 // Child process will use our stdin and stdout pipes
384                 // (we close our copies below)
385                 cmd.Stdin = os.Stdin
386                 cmd.Stdout = os.Stdout
387
388                 // Forward SIGINT and SIGTERM to inner process
389                 term := make(chan os.Signal, 1)
390                 go func(sig <-chan os.Signal) {
391                         catch := <-sig
392                         if cmd.Process != nil {
393                                 cmd.Process.Signal(catch)
394                         }
395                         logger.Print("caught signal: ", catch)
396                 }(term)
397                 signal.Notify(term, syscall.SIGTERM)
398                 signal.Notify(term, syscall.SIGINT)
399
400                 // Funnel stderr through our channel
401                 stderr_pipe, err := cmd.StderrPipe()
402                 if err != nil {
403                         logger.Fatal(err)
404                 }
405                 go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
406
407                 // Run subprocess
408                 if err := cmd.Start(); err != nil {
409                         logger.Fatal(err)
410                 }
411
412                 // Close stdin/stdout in this (parent) process
413                 os.Stdin.Close()
414                 os.Stdout.Close()
415         }
416
417         // Read the cid file
418         var container_id string
419         if cgroup_cidfile != "" {
420                 // wait up to 'wait' seconds for the cid file to appear
421                 ok := false
422                 var i time.Duration
423                 for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
424                         cid, err := ioutil.ReadFile(cgroup_cidfile)
425                         if err == nil && len(cid) > 0 {
426                                 ok = true
427                                 container_id = string(cid)
428                                 break
429                         }
430                         time.Sleep(100 * time.Millisecond)
431                 }
432                 if !ok {
433                         logger.Printf("Could not read cid file %s", cgroup_cidfile)
434                 }
435         }
436
437         stop_poll_chan := make(chan bool, 1)
438         cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
439         go PollCgroupStats(cgroup, poll, stop_poll_chan)
440
441         // When the child exits, tell the polling goroutine to stop.
442         defer func() { stop_poll_chan <- true }()
443
444         // Wait for CopyPipeToChan to consume child's stderr pipe
445         <-finish_chan
446
447         return cmd.Wait()
448 }
449
450 func main() {
451         logger := log.New(os.Stderr, "crunchstat: ", 0)
452         if err := run(logger); err != nil {
453                 if exiterr, ok := err.(*exec.ExitError); ok {
454                         // The program has exited with an exit code != 0
455
456                         // This works on both Unix and
457                         // Windows. Although package syscall is
458                         // generally platform dependent, WaitStatus is
459                         // defined for both Unix and Windows and in
460                         // both cases has an ExitStatus() method with
461                         // the same signature.
462                         if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
463                                 os.Exit(status.ExitStatus())
464                         }
465                 } else {
466                         logger.Fatalf("cmd.Wait: %v", err)
467                 }
468         }
469 }