Merge branch 'master' into 3408-production-datamanager
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "os/exec"
14         "os/signal"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 /*
22 #include <unistd.h>
23 #include <sys/types.h>
24 #include <pwd.h>
25 #include <stdlib.h>
26 */
27 import "C"
28
29 // The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
30
31 type Cgroup struct {
32         root   string
33         parent string
34         cid    string
35 }
36
37 func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
38         defer in.Close()
39
40         // TODO(twp): handle long input records gracefully, if possible
41         // without killing the child task (#4889)
42         //
43         s := bufio.NewScanner(in)
44         for s.Scan() {
45                 out <- s.Text()
46         }
47         if s.Err() != nil {
48                 out <- fmt.Sprintf("crunchstat: line buffering error: %s", s.Err())
49         }
50         done <- true
51 }
52
53 func CopyChanToPipe(in <-chan string, out io.Writer) {
54         for s := range in {
55                 fmt.Fprintln(out, s)
56         }
57 }
58
59 var logChan chan string
60
61 func LogPrintf(format string, args ...interface{}) {
62         if logChan == nil {
63                 return
64         }
65         logChan <- fmt.Sprintf("crunchstat: "+format, args...)
66 }
67
68 func ReadAllOrWarn(in *os.File) ([]byte, error) {
69         content, err := ioutil.ReadAll(in)
70         if err != nil {
71                 LogPrintf("read %s: %s", in.Name(), err)
72         }
73         return content, err
74 }
75
76 var reportedStatFile = map[string]string{}
77
78 // Open the cgroup stats file in /sys/fs corresponding to the target
79 // cgroup, and return an *os.File. If no stats file is available,
80 // return nil.
81 //
82 // TODO: Instead of trying all options, choose a process in the
83 // container, and read /proc/PID/cgroup to determine the appropriate
84 // cgroup root for the given statgroup. (This will avoid falling back
85 // to host-level stats during container setup and teardown.)
86 func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
87         var paths = []string{
88                 fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
89                 fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
90                 fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
91                 fmt.Sprintf("%s/%s", cgroup.root, stat),
92         }
93         var path string
94         var file *os.File
95         var err error
96         for _, path = range paths {
97                 file, err = os.Open(path)
98                 if err == nil {
99                         break
100                 } else {
101                         path = ""
102                 }
103         }
104         if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
105                 // Log whenever we start using a new/different cgroup
106                 // stat file for a given statistic. This typically
107                 // happens 1 to 3 times per statistic, depending on
108                 // whether we happen to collect stats [a] before any
109                 // processes have been created in the container and
110                 // [b] after all contained processes have exited.
111                 reportedStatFile[stat] = path
112                 if path == "" {
113                         LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
114                 } else {
115                         LogPrintf("reading stats from %s", path)
116                 }
117         }
118         return file, err
119 }
120
121 func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
122         procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
123         if err != nil {
124                 return nil, err
125         }
126         defer procsFile.Close()
127         reader := bufio.NewScanner(procsFile)
128         for reader.Scan() {
129                 taskPid := reader.Text()
130                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
131                 stats, err := ioutil.ReadFile(statsFilename)
132                 if err != nil {
133                         LogPrintf("read %s: %s", statsFilename, err)
134                         continue
135                 }
136                 return strings.NewReader(string(stats)), nil
137         }
138         return nil, errors.New("Could not read stats for any proc in container")
139 }
140
141 type IoSample struct {
142         sampleTime time.Time
143         txBytes    int64
144         rxBytes    int64
145 }
146
147 func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
148         c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
149         if err != nil {
150                 return
151         }
152         defer c.Close()
153         b := bufio.NewScanner(c)
154         var sampleTime = time.Now()
155         newSamples := make(map[string]IoSample)
156         for b.Scan() {
157                 var device, op string
158                 var val int64
159                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
160                         continue
161                 }
162                 var thisSample IoSample
163                 var ok bool
164                 if thisSample, ok = newSamples[device]; !ok {
165                         thisSample = IoSample{sampleTime, -1, -1}
166                 }
167                 switch op {
168                 case "Read":
169                         thisSample.rxBytes = val
170                 case "Write":
171                         thisSample.txBytes = val
172                 }
173                 newSamples[device] = thisSample
174         }
175         for dev, sample := range newSamples {
176                 if sample.txBytes < 0 || sample.rxBytes < 0 {
177                         continue
178                 }
179                 delta := ""
180                 if prev, ok := lastSample[dev]; ok {
181                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
182                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
183                                 sample.txBytes-prev.txBytes,
184                                 sample.rxBytes-prev.rxBytes)
185                 }
186                 LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
187                 lastSample[dev] = sample
188         }
189 }
190
191 type MemSample struct {
192         sampleTime time.Time
193         memStat    map[string]int64
194 }
195
196 func DoMemoryStats(cgroup Cgroup) {
197         c, err := OpenStatFile(cgroup, "memory", "memory.stat")
198         if err != nil {
199                 return
200         }
201         defer c.Close()
202         b := bufio.NewScanner(c)
203         thisSample := MemSample{time.Now(), make(map[string]int64)}
204         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
205         for b.Scan() {
206                 var stat string
207                 var val int64
208                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
209                         continue
210                 }
211                 thisSample.memStat[stat] = val
212         }
213         var outstat bytes.Buffer
214         for _, key := range wantStats {
215                 if val, ok := thisSample.memStat[key]; ok {
216                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
217                 }
218         }
219         LogPrintf("mem%s", outstat.String())
220 }
221
222 func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
223         sampleTime := time.Now()
224         stats, err := GetContainerNetStats(cgroup)
225         if err != nil {
226                 return
227         }
228
229         scanner := bufio.NewScanner(stats)
230         for scanner.Scan() {
231                 var ifName string
232                 var rx, tx int64
233                 words := strings.Fields(scanner.Text())
234                 if len(words) != 17 {
235                         // Skip lines with wrong format
236                         continue
237                 }
238                 ifName = strings.TrimRight(words[0], ":")
239                 if ifName == "lo" || ifName == "" {
240                         // Skip loopback interface and lines with wrong format
241                         continue
242                 }
243                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
244                         continue
245                 }
246                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
247                         continue
248                 }
249                 nextSample := IoSample{}
250                 nextSample.sampleTime = sampleTime
251                 nextSample.txBytes = tx
252                 nextSample.rxBytes = rx
253                 var delta string
254                 if prev, ok := lastSample[ifName]; ok {
255                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
256                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
257                                 interval,
258                                 tx-prev.txBytes,
259                                 rx-prev.rxBytes)
260                 }
261                 LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
262                 lastSample[ifName] = nextSample
263         }
264 }
265
266 type CpuSample struct {
267         hasData    bool // to distinguish the zero value from real data
268         sampleTime time.Time
269         user       float64
270         sys        float64
271         cpus       int64
272 }
273
274 // Return the number of CPUs available in the container. Return 0 if
275 // we can't figure out the real number of CPUs.
276 func GetCpuCount(cgroup Cgroup) int64 {
277         cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
278         if err != nil {
279                 return 0
280         }
281         defer cpusetFile.Close()
282         b, err := ReadAllOrWarn(cpusetFile)
283         sp := strings.Split(string(b), ",")
284         cpus := int64(0)
285         for _, v := range sp {
286                 var min, max int64
287                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
288                 if n == 2 {
289                         cpus += (max - min) + 1
290                 } else {
291                         cpus += 1
292                 }
293         }
294         return cpus
295 }
296
297 func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
298         statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
299         if err != nil {
300                 return
301         }
302         defer statFile.Close()
303         b, err := ReadAllOrWarn(statFile)
304         if err != nil {
305                 return
306         }
307
308         nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
309         var userTicks, sysTicks int64
310         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
311         user_hz := float64(C.sysconf(C._SC_CLK_TCK))
312         nextSample.user = float64(userTicks) / user_hz
313         nextSample.sys = float64(sysTicks) / user_hz
314
315         delta := ""
316         if lastSample.hasData {
317                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
318                         nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
319                         nextSample.user-lastSample.user,
320                         nextSample.sys-lastSample.sys)
321         }
322         LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
323                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
324         *lastSample = nextSample
325 }
326
327 func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
328         var lastNetSample = map[string]IoSample{}
329         var lastDiskSample = map[string]IoSample{}
330         var lastCpuSample = CpuSample{}
331
332         poll_chan := make(chan bool, 1)
333         go func() {
334                 // Send periodic poll events.
335                 poll_chan <- true
336                 for {
337                         time.Sleep(time.Duration(poll) * time.Millisecond)
338                         poll_chan <- true
339                 }
340         }()
341         for {
342                 select {
343                 case <-stop_poll_chan:
344                         return
345                 case <-poll_chan:
346                         // Emit stats, then select again.
347                 }
348                 DoMemoryStats(cgroup)
349                 DoCpuStats(cgroup, &lastCpuSample)
350                 DoBlkIoStats(cgroup, lastDiskSample)
351                 DoNetworkStats(cgroup, lastNetSample)
352         }
353 }
354
355 func run(logger *log.Logger) error {
356
357         var (
358                 cgroup_root    string
359                 cgroup_parent  string
360                 cgroup_cidfile string
361                 wait           int64
362                 poll           int64
363         )
364
365         flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
366         flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
367         flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
368         flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
369         flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
370
371         flag.Parse()
372
373         if cgroup_root == "" {
374                 logger.Fatal("Must provide -cgroup-root")
375         }
376
377         logChan = make(chan string, 1)
378         defer close(logChan)
379         finish_chan := make(chan bool)
380         defer close(finish_chan)
381
382         go CopyChanToPipe(logChan, os.Stderr)
383
384         var cmd *exec.Cmd
385
386         if len(flag.Args()) > 0 {
387                 // Set up subprocess
388                 cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
389
390                 logger.Print("Running ", flag.Args())
391
392                 // Child process will use our stdin and stdout pipes
393                 // (we close our copies below)
394                 cmd.Stdin = os.Stdin
395                 cmd.Stdout = os.Stdout
396
397                 // Forward SIGINT and SIGTERM to inner process
398                 term := make(chan os.Signal, 1)
399                 go func(sig <-chan os.Signal) {
400                         catch := <-sig
401                         if cmd.Process != nil {
402                                 cmd.Process.Signal(catch)
403                         }
404                         logger.Print("caught signal: ", catch)
405                 }(term)
406                 signal.Notify(term, syscall.SIGTERM)
407                 signal.Notify(term, syscall.SIGINT)
408
409                 // Funnel stderr through our channel
410                 stderr_pipe, err := cmd.StderrPipe()
411                 if err != nil {
412                         logger.Fatal(err)
413                 }
414                 go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
415
416                 // Run subprocess
417                 if err := cmd.Start(); err != nil {
418                         logger.Fatal(err)
419                 }
420
421                 // Close stdin/stdout in this (parent) process
422                 os.Stdin.Close()
423                 os.Stdout.Close()
424         }
425
426         // Read the cid file
427         var container_id string
428         if cgroup_cidfile != "" {
429                 // wait up to 'wait' seconds for the cid file to appear
430                 ok := false
431                 var i time.Duration
432                 for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
433                         cid, err := ioutil.ReadFile(cgroup_cidfile)
434                         if err == nil && len(cid) > 0 {
435                                 ok = true
436                                 container_id = string(cid)
437                                 break
438                         }
439                         time.Sleep(100 * time.Millisecond)
440                 }
441                 if !ok {
442                         logger.Printf("Could not read cid file %s", cgroup_cidfile)
443                 }
444         }
445
446         stop_poll_chan := make(chan bool, 1)
447         cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
448         go PollCgroupStats(cgroup, poll, stop_poll_chan)
449
450         // When the child exits, tell the polling goroutine to stop.
451         defer func() { stop_poll_chan <- true }()
452
453         // Wait for CopyPipeToChan to consume child's stderr pipe
454         <-finish_chan
455
456         return cmd.Wait()
457 }
458
459 func main() {
460         logger := log.New(os.Stderr, "crunchstat: ", 0)
461         if err := run(logger); err != nil {
462                 if exiterr, ok := err.(*exec.ExitError); ok {
463                         // The program has exited with an exit code != 0
464
465                         // This works on both Unix and
466                         // Windows. Although package syscall is
467                         // generally platform dependent, WaitStatus is
468                         // defined for both Unix and Windows and in
469                         // both cases has an ExitStatus() method with
470                         // the same signature.
471                         if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
472                                 os.Exit(status.ExitStatus())
473                         }
474                 } else {
475                         logger.Fatalf("cmd.Wait: %v", err)
476                 }
477         }
478 }