5043: Accept long stderr lines from crunch tasks.
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "os/exec"
14         "os/signal"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 /*
22 #include <unistd.h>
23 #include <sys/types.h>
24 #include <pwd.h>
25 #include <stdlib.h>
26 */
27 import "C"
28
29 // The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
30
31 type Cgroup struct {
32         root   string
33         parent string
34         cid    string
35 }
36
37 func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
38         reader := bufio.NewReader(in)
39         for {
40                 line, err := reader.ReadBytes('\n')
41                 if len(line) > 0 {
42                         if err == nil {
43                                 // err == nil IFF line ends in \n
44                                 line = line[:len(line)-1]
45                         }
46                         out <- string(line)
47                 }
48                 if err != nil {
49                         if err != io.EOF {
50                                 out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
51                         }
52                         break
53                 }
54         }
55         done <- true
56         in.Close()
57 }
58
59 func CopyChanToPipe(in <-chan string, out io.Writer) {
60         for s := range in {
61                 fmt.Fprintln(out, s)
62         }
63 }
64
65 var logChan chan string
66
67 func LogPrintf(format string, args ...interface{}) {
68         if logChan == nil {
69                 return
70         }
71         logChan <- fmt.Sprintf("crunchstat: "+format, args...)
72 }
73
74 func ReadAllOrWarn(in *os.File) ([]byte, error) {
75         content, err := ioutil.ReadAll(in)
76         if err != nil {
77                 LogPrintf("read %s: %s", in.Name(), err)
78         }
79         return content, err
80 }
81
82 var reportedStatFile = map[string]string{}
83
84 // Open the cgroup stats file in /sys/fs corresponding to the target
85 // cgroup, and return an *os.File. If no stats file is available,
86 // return nil.
87 //
88 // TODO: Instead of trying all options, choose a process in the
89 // container, and read /proc/PID/cgroup to determine the appropriate
90 // cgroup root for the given statgroup. (This will avoid falling back
91 // to host-level stats during container setup and teardown.)
92 func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
93         var paths = []string{
94                 fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
95                 fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
96                 fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
97                 fmt.Sprintf("%s/%s", cgroup.root, stat),
98         }
99         var path string
100         var file *os.File
101         var err error
102         for _, path = range paths {
103                 file, err = os.Open(path)
104                 if err == nil {
105                         break
106                 } else {
107                         path = ""
108                 }
109         }
110         if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
111                 // Log whenever we start using a new/different cgroup
112                 // stat file for a given statistic. This typically
113                 // happens 1 to 3 times per statistic, depending on
114                 // whether we happen to collect stats [a] before any
115                 // processes have been created in the container and
116                 // [b] after all contained processes have exited.
117                 reportedStatFile[stat] = path
118                 if path == "" {
119                         LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
120                 } else {
121                         LogPrintf("reading stats from %s", path)
122                 }
123         }
124         return file, err
125 }
126
127 func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
128         procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
129         if err != nil {
130                 return nil, err
131         }
132         defer procsFile.Close()
133         reader := bufio.NewScanner(procsFile)
134         for reader.Scan() {
135                 taskPid := reader.Text()
136                 statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
137                 stats, err := ioutil.ReadFile(statsFilename)
138                 if err != nil {
139                         LogPrintf("read %s: %s", statsFilename, err)
140                         continue
141                 }
142                 return strings.NewReader(string(stats)), nil
143         }
144         return nil, errors.New("Could not read stats for any proc in container")
145 }
146
147 type IoSample struct {
148         sampleTime time.Time
149         txBytes    int64
150         rxBytes    int64
151 }
152
153 func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
154         c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
155         if err != nil {
156                 return
157         }
158         defer c.Close()
159         b := bufio.NewScanner(c)
160         var sampleTime = time.Now()
161         newSamples := make(map[string]IoSample)
162         for b.Scan() {
163                 var device, op string
164                 var val int64
165                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
166                         continue
167                 }
168                 var thisSample IoSample
169                 var ok bool
170                 if thisSample, ok = newSamples[device]; !ok {
171                         thisSample = IoSample{sampleTime, -1, -1}
172                 }
173                 switch op {
174                 case "Read":
175                         thisSample.rxBytes = val
176                 case "Write":
177                         thisSample.txBytes = val
178                 }
179                 newSamples[device] = thisSample
180         }
181         for dev, sample := range newSamples {
182                 if sample.txBytes < 0 || sample.rxBytes < 0 {
183                         continue
184                 }
185                 delta := ""
186                 if prev, ok := lastSample[dev]; ok {
187                         delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
188                                 sample.sampleTime.Sub(prev.sampleTime).Seconds(),
189                                 sample.txBytes-prev.txBytes,
190                                 sample.rxBytes-prev.rxBytes)
191                 }
192                 LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
193                 lastSample[dev] = sample
194         }
195 }
196
197 type MemSample struct {
198         sampleTime time.Time
199         memStat    map[string]int64
200 }
201
202 func DoMemoryStats(cgroup Cgroup) {
203         c, err := OpenStatFile(cgroup, "memory", "memory.stat")
204         if err != nil {
205                 return
206         }
207         defer c.Close()
208         b := bufio.NewScanner(c)
209         thisSample := MemSample{time.Now(), make(map[string]int64)}
210         wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
211         for b.Scan() {
212                 var stat string
213                 var val int64
214                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
215                         continue
216                 }
217                 thisSample.memStat[stat] = val
218         }
219         var outstat bytes.Buffer
220         for _, key := range wantStats {
221                 if val, ok := thisSample.memStat[key]; ok {
222                         outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
223                 }
224         }
225         LogPrintf("mem%s", outstat.String())
226 }
227
228 func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
229         sampleTime := time.Now()
230         stats, err := GetContainerNetStats(cgroup)
231         if err != nil {
232                 return
233         }
234
235         scanner := bufio.NewScanner(stats)
236         for scanner.Scan() {
237                 var ifName string
238                 var rx, tx int64
239                 words := strings.Fields(scanner.Text())
240                 if len(words) != 17 {
241                         // Skip lines with wrong format
242                         continue
243                 }
244                 ifName = strings.TrimRight(words[0], ":")
245                 if ifName == "lo" || ifName == "" {
246                         // Skip loopback interface and lines with wrong format
247                         continue
248                 }
249                 if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
250                         continue
251                 }
252                 if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
253                         continue
254                 }
255                 nextSample := IoSample{}
256                 nextSample.sampleTime = sampleTime
257                 nextSample.txBytes = tx
258                 nextSample.rxBytes = rx
259                 var delta string
260                 if prev, ok := lastSample[ifName]; ok {
261                         interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
262                         delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
263                                 interval,
264                                 tx-prev.txBytes,
265                                 rx-prev.rxBytes)
266                 }
267                 LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
268                 lastSample[ifName] = nextSample
269         }
270 }
271
272 type CpuSample struct {
273         hasData    bool // to distinguish the zero value from real data
274         sampleTime time.Time
275         user       float64
276         sys        float64
277         cpus       int64
278 }
279
280 // Return the number of CPUs available in the container. Return 0 if
281 // we can't figure out the real number of CPUs.
282 func GetCpuCount(cgroup Cgroup) int64 {
283         cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
284         if err != nil {
285                 return 0
286         }
287         defer cpusetFile.Close()
288         b, err := ReadAllOrWarn(cpusetFile)
289         sp := strings.Split(string(b), ",")
290         cpus := int64(0)
291         for _, v := range sp {
292                 var min, max int64
293                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
294                 if n == 2 {
295                         cpus += (max - min) + 1
296                 } else {
297                         cpus += 1
298                 }
299         }
300         return cpus
301 }
302
303 func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
304         statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
305         if err != nil {
306                 return
307         }
308         defer statFile.Close()
309         b, err := ReadAllOrWarn(statFile)
310         if err != nil {
311                 return
312         }
313
314         nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
315         var userTicks, sysTicks int64
316         fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
317         user_hz := float64(C.sysconf(C._SC_CLK_TCK))
318         nextSample.user = float64(userTicks) / user_hz
319         nextSample.sys = float64(sysTicks) / user_hz
320
321         delta := ""
322         if lastSample.hasData {
323                 delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
324                         nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
325                         nextSample.user-lastSample.user,
326                         nextSample.sys-lastSample.sys)
327         }
328         LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
329                 nextSample.user, nextSample.sys, nextSample.cpus, delta)
330         *lastSample = nextSample
331 }
332
333 func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
334         var lastNetSample = map[string]IoSample{}
335         var lastDiskSample = map[string]IoSample{}
336         var lastCpuSample = CpuSample{}
337
338         poll_chan := make(chan bool, 1)
339         go func() {
340                 // Send periodic poll events.
341                 poll_chan <- true
342                 for {
343                         time.Sleep(time.Duration(poll) * time.Millisecond)
344                         poll_chan <- true
345                 }
346         }()
347         for {
348                 select {
349                 case <-stop_poll_chan:
350                         return
351                 case <-poll_chan:
352                         // Emit stats, then select again.
353                 }
354                 DoMemoryStats(cgroup)
355                 DoCpuStats(cgroup, &lastCpuSample)
356                 DoBlkIoStats(cgroup, lastDiskSample)
357                 DoNetworkStats(cgroup, lastNetSample)
358         }
359 }
360
361 func run(logger *log.Logger) error {
362
363         var (
364                 cgroup_root    string
365                 cgroup_parent  string
366                 cgroup_cidfile string
367                 wait           int64
368                 poll           int64
369         )
370
371         flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
372         flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
373         flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
374         flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
375         flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
376
377         flag.Parse()
378
379         if cgroup_root == "" {
380                 logger.Fatal("Must provide -cgroup-root")
381         }
382
383         logChan = make(chan string, 1)
384         defer close(logChan)
385         finish_chan := make(chan bool)
386         defer close(finish_chan)
387
388         go CopyChanToPipe(logChan, os.Stderr)
389
390         var cmd *exec.Cmd
391
392         if len(flag.Args()) > 0 {
393                 // Set up subprocess
394                 cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
395
396                 logger.Print("Running ", flag.Args())
397
398                 // Child process will use our stdin and stdout pipes
399                 // (we close our copies below)
400                 cmd.Stdin = os.Stdin
401                 cmd.Stdout = os.Stdout
402
403                 // Forward SIGINT and SIGTERM to inner process
404                 term := make(chan os.Signal, 1)
405                 go func(sig <-chan os.Signal) {
406                         catch := <-sig
407                         if cmd.Process != nil {
408                                 cmd.Process.Signal(catch)
409                         }
410                         logger.Print("caught signal: ", catch)
411                 }(term)
412                 signal.Notify(term, syscall.SIGTERM)
413                 signal.Notify(term, syscall.SIGINT)
414
415                 // Funnel stderr through our channel
416                 stderr_pipe, err := cmd.StderrPipe()
417                 if err != nil {
418                         logger.Fatal(err)
419                 }
420                 go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
421
422                 // Run subprocess
423                 if err := cmd.Start(); err != nil {
424                         logger.Fatal(err)
425                 }
426
427                 // Close stdin/stdout in this (parent) process
428                 os.Stdin.Close()
429                 os.Stdout.Close()
430         }
431
432         // Read the cid file
433         var container_id string
434         if cgroup_cidfile != "" {
435                 // wait up to 'wait' seconds for the cid file to appear
436                 ok := false
437                 var i time.Duration
438                 for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
439                         cid, err := ioutil.ReadFile(cgroup_cidfile)
440                         if err == nil && len(cid) > 0 {
441                                 ok = true
442                                 container_id = string(cid)
443                                 break
444                         }
445                         time.Sleep(100 * time.Millisecond)
446                 }
447                 if !ok {
448                         logger.Printf("Could not read cid file %s", cgroup_cidfile)
449                 }
450         }
451
452         stop_poll_chan := make(chan bool, 1)
453         cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
454         go PollCgroupStats(cgroup, poll, stop_poll_chan)
455
456         // When the child exits, tell the polling goroutine to stop.
457         defer func() { stop_poll_chan <- true }()
458
459         // Wait for CopyPipeToChan to consume child's stderr pipe
460         <-finish_chan
461
462         return cmd.Wait()
463 }
464
465 func main() {
466         logger := log.New(os.Stderr, "crunchstat: ", 0)
467         if err := run(logger); err != nil {
468                 if exiterr, ok := err.(*exec.ExitError); ok {
469                         // The program has exited with an exit code != 0
470
471                         // This works on both Unix and
472                         // Windows. Although package syscall is
473                         // generally platform dependent, WaitStatus is
474                         // defined for both Unix and Windows and in
475                         // both cases has an ExitStatus() method with
476                         // the same signature.
477                         if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
478                                 os.Exit(status.ExitStatus())
479                         }
480                 } else {
481                         logger.Fatalf("cmd.Wait: %v", err)
482                 }
483         }
484 }