Merge branch 'master' of git.curoverse.com:arvados into 3408-production-datamanager
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "flag"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "os"
11         "os/exec"
12         "os/signal"
13         "strings"
14         "syscall"
15         "time"
16 )
17
18 func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) {
19         s := bufio.NewScanner(in)
20         for s.Scan() {
21                 out <- s.Text()
22         }
23         done <- true
24 }
25
26 func CopyChanToPipe(in <-chan string, out io.Writer) {
27         for s := range in {
28                 fmt.Fprintln(out, s)
29         }
30 }
31
32 func FindStat(cgroup_root string, cgroup_parent string, container_id string, statgroup string, stat string) string {
33         var path string
34         path = fmt.Sprintf("%s/%s/%s/%s/%s.%s", cgroup_root, statgroup, cgroup_parent, container_id, statgroup, stat)
35         if _, err := os.Stat(path); err == nil {
36                 return path
37         }
38         path = fmt.Sprintf("%s/%s/%s/%s.%s", cgroup_root, cgroup_parent, container_id, statgroup, stat)
39         if _, err := os.Stat(path); err == nil {
40                 return path
41         }
42         path = fmt.Sprintf("%s/%s/%s.%s", cgroup_root, statgroup, statgroup, stat)
43         if _, err := os.Stat(path); err == nil {
44                 return path
45         }
46         path = fmt.Sprintf("%s/%s.%s", cgroup_root, statgroup, stat)
47         if _, err := os.Stat(path); err == nil {
48                 return path
49         }
50         return ""
51 }
52
53 func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id string, stderr chan string, poll int64, stop_poll_chan <-chan bool) {
54         //var last_usage int64 = 0
55         var last_user int64 = -1
56         var last_sys int64 = -1
57         var last_cpucount int64 = 0
58
59         type Disk struct {
60                 last_read  int64
61                 next_read  int64
62                 last_write int64
63                 next_write int64
64         }
65
66         disk := make(map[string]*Disk)
67
68         //cpuacct_usage := FindStat(cgroup_path, "cpuacct", "usage")
69         cpuacct_stat := FindStat(cgroup_root, cgroup_parent, container_id, "cpuacct", "stat")
70         blkio_io_service_bytes := FindStat(cgroup_root, cgroup_parent, container_id, "blkio", "io_service_bytes")
71         cpuset_cpus := FindStat(cgroup_root, cgroup_parent, container_id, "cpuset", "cpus")
72         memory_stat := FindStat(cgroup_root, cgroup_parent, container_id, "memory", "stat")
73
74         if cpuacct_stat != "" {
75                 stderr <- fmt.Sprintf("crunchstat: reading stats from %s", cpuacct_stat)
76         }
77         if blkio_io_service_bytes != "" {
78                 stderr <- fmt.Sprintf("crunchstat: reading stats from %s", blkio_io_service_bytes)
79         }
80         if cpuset_cpus != "" {
81                 stderr <- fmt.Sprintf("crunchstat: reading stats from %s", cpuset_cpus)
82         }
83         if memory_stat != "" {
84                 stderr <- fmt.Sprintf("crunchstat: reading stats from %s", memory_stat)
85         }
86
87         poll_chan := make(chan bool, 1)
88         go func() {
89                 // Send periodic poll events.
90                 poll_chan <- true
91                 for {
92                         time.Sleep(time.Duration(poll) * time.Millisecond)
93                         poll_chan <- true
94                 }
95         }()
96         for {
97                 bedtime := time.Now()
98                 select {
99                 case <-stop_poll_chan:
100                         return
101                 case <-poll_chan:
102                         // Emit stats, then select again.
103                 }
104                 morning := time.Now()
105                 elapsed := morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
106                 /*{
107                         c, _ := os.Open(cpuacct_usage)
108                         b, _ := ioutil.ReadAll(c)
109                         var next int64
110                         fmt.Sscanf(string(b), "%d", &next)
111                         if last_usage != 0 {
112                                 stderr <- fmt.Sprintf("crunchstat: cpuacct.usage %v", (next-last_usage)/10000000)
113                         }
114                         //fmt.Printf("usage %d %d %d %d%%\n", last_usage, next, next-last_usage, (next-last_usage)/10000000)
115                         last_usage = next
116                         c.Close()
117                 }*/
118                 var cpus int64 = 0
119                 if cpuset_cpus != "" {
120                         c, err := os.Open(cpuset_cpus)
121                         if err != nil {
122                                 stderr <- fmt.Sprintf("open %s: %s", cpuset_cpus, err)
123                                 // cgroup probably gone -- skip other stats too.
124                                 continue
125                         }
126                         b, _ := ioutil.ReadAll(c)
127                         sp := strings.Split(string(b), ",")
128                         for _, v := range sp {
129                                 var min, max int64
130                                 n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
131                                 if n == 2 {
132                                         cpus += (max - min) + 1
133                                 } else {
134                                         cpus += 1
135                                 }
136                         }
137
138                         if cpus != last_cpucount {
139                                 stderr <- fmt.Sprintf("crunchstat: cpuset.cpus %v", cpus)
140                         }
141                         last_cpucount = cpus
142
143                         c.Close()
144                 }
145                 if cpus == 0 {
146                         cpus = 1
147                 }
148                 if cpuacct_stat != "" {
149                         c, err := os.Open(cpuacct_stat)
150                         if err != nil {
151                                 stderr <- fmt.Sprintf("open %s: %s", cpuacct_stat, err)
152                                 // Next time around, last_user would
153                                 // be >1 interval old, so stats will
154                                 // be incorrect. Start over instead.
155                                 last_user = -1
156
157                                 // cgroup probably gone -- skip other stats too.
158                                 continue
159                         }
160                         b, _ := ioutil.ReadAll(c)
161                         var next_user int64
162                         var next_sys int64
163                         fmt.Sscanf(string(b), "user %d\nsystem %d", &next_user, &next_sys)
164                         c.Close()
165
166                         if elapsed > 0 && last_user != -1 {
167                                 user_diff := next_user - last_user
168                                 sys_diff := next_sys - last_sys
169                                 // Assume we're reading stats based on 100
170                                 // jiffies per second.  Because the elapsed
171                                 // time is in milliseconds, we need to boost
172                                 // that to 1000 jiffies per second, then boost
173                                 // it by another 100x to get a percentage, then
174                                 // finally divide by the actual elapsed time
175                                 // and the number of cpus to get average load
176                                 // over the polling period.
177                                 user_pct := (user_diff * 10 * 100) / (elapsed * cpus)
178                                 sys_pct := (sys_diff * 10 * 100) / (elapsed * cpus)
179
180                                 stderr <- fmt.Sprintf("crunchstat: cpuacct.stat user %v", user_pct)
181                                 stderr <- fmt.Sprintf("crunchstat: cpuacct.stat sys %v", sys_pct)
182                         }
183
184                         /*fmt.Printf("user %d %d %d%%\n", last_user, next_user, next_user-last_user)
185                         fmt.Printf("sys %d %d %d%%\n", last_sys, next_sys, next_sys-last_sys)
186                         fmt.Printf("sum %d%%\n", (next_user-last_user)+(next_sys-last_sys))*/
187                         last_user = next_user
188                         last_sys = next_sys
189                 }
190                 if blkio_io_service_bytes != "" {
191                         c, err := os.Open(blkio_io_service_bytes)
192                         if err != nil {
193                                 stderr <- fmt.Sprintf("open %s: %s", blkio_io_service_bytes, err)
194                                 // cgroup probably gone -- skip other stats too.
195                                 continue
196                         }
197                         b := bufio.NewScanner(c)
198                         var device, op string
199                         var next int64
200                         for b.Scan() {
201                                 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &next); err == nil {
202                                         if disk[device] == nil {
203                                                 disk[device] = new(Disk)
204                                         }
205                                         if op == "Read" {
206                                                 disk[device].last_read = disk[device].next_read
207                                                 disk[device].next_read = next
208                                                 if disk[device].last_read > 0 && (disk[device].next_read != disk[device].last_read) {
209                                                         stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s read %v", device, disk[device].next_read-disk[device].last_read)
210                                                 }
211                                         }
212                                         if op == "Write" {
213                                                 disk[device].last_write = disk[device].next_write
214                                                 disk[device].next_write = next
215                                                 if disk[device].last_write > 0 && (disk[device].next_write != disk[device].last_write) {
216                                                         stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s write %v", device, disk[device].next_write-disk[device].last_write)
217                                                 }
218                                         }
219                                 }
220                         }
221                         c.Close()
222                 }
223
224                 if memory_stat != "" {
225                         c, err := os.Open(memory_stat)
226                         if err != nil {
227                                 stderr <- fmt.Sprintf("open %s: %s", memory_stat, err)
228                                 // cgroup probably gone -- skip other stats too.
229                                 continue
230                         }
231                         b := bufio.NewScanner(c)
232                         var stat string
233                         var val int64
234                         for b.Scan() {
235                                 if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err == nil {
236                                         if stat == "rss" {
237                                                 stderr <- fmt.Sprintf("crunchstat: memory.stat rss %v", val)
238                                         }
239                                 }
240                         }
241                         c.Close()
242                 }
243         }
244 }
245
246 func run(logger *log.Logger) error {
247
248         var (
249                 cgroup_root    string
250                 cgroup_parent  string
251                 cgroup_cidfile string
252                 wait           int64
253                 poll           int64
254         )
255
256         flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
257         flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
258         flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
259         flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
260         flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
261
262         flag.Parse()
263
264         if cgroup_root == "" {
265                 logger.Fatal("Must provide -cgroup-root")
266         }
267
268         stderr_chan := make(chan string, 1)
269         defer close(stderr_chan)
270         finish_chan := make(chan bool)
271         defer close(finish_chan)
272
273         go CopyChanToPipe(stderr_chan, os.Stderr)
274
275         var cmd *exec.Cmd
276
277         if len(flag.Args()) > 0 {
278                 // Set up subprocess
279                 cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
280
281                 logger.Print("Running ", flag.Args())
282
283                 // Child process will use our stdin and stdout pipes
284                 // (we close our copies below)
285                 cmd.Stdin = os.Stdin
286                 cmd.Stdout = os.Stdout
287
288                 // Forward SIGINT and SIGTERM to inner process
289                 term := make(chan os.Signal, 1)
290                 go func(sig <-chan os.Signal) {
291                         catch := <-sig
292                         if cmd.Process != nil {
293                                 cmd.Process.Signal(catch)
294                         }
295                         logger.Print("caught signal: ", catch)
296                 }(term)
297                 signal.Notify(term, syscall.SIGTERM)
298                 signal.Notify(term, syscall.SIGINT)
299
300                 // Funnel stderr through our channel
301                 stderr_pipe, err := cmd.StderrPipe()
302                 if err != nil {
303                         logger.Fatal(err)
304                 }
305                 go CopyPipeToChan(stderr_pipe, stderr_chan, finish_chan)
306
307                 // Run subprocess
308                 if err := cmd.Start(); err != nil {
309                         logger.Fatal(err)
310                 }
311
312                 // Close stdin/stdout in this (parent) process
313                 os.Stdin.Close()
314                 os.Stdout.Close()
315         }
316
317         // Read the cid file
318         var container_id string
319         if cgroup_cidfile != "" {
320                 // wait up to 'wait' seconds for the cid file to appear
321                 ok := false
322                 var i time.Duration
323                 for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
324                         f, err := os.Open(cgroup_cidfile)
325                         if err == nil {
326                                 defer f.Close()
327                                 cid, err2 := ioutil.ReadAll(f)
328                                 if err2 == nil && len(cid) > 0 {
329                                         ok = true
330                                         container_id = string(cid)
331                                         break
332                                 }
333                         }
334                         time.Sleep(100 * time.Millisecond)
335                 }
336                 if !ok {
337                         logger.Printf("Could not read cid file %s", cgroup_cidfile)
338                 }
339         }
340
341         stop_poll_chan := make(chan bool, 1)
342         go PollCgroupStats(cgroup_root, cgroup_parent, container_id, stderr_chan, poll, stop_poll_chan)
343
344         // When the child exits, tell the polling goroutine to stop.
345         defer func() { stop_poll_chan <- true }()
346
347         // Wait for CopyPipeToChan to consume child's stderr pipe
348         <-finish_chan
349
350         return cmd.Wait()
351 }
352
353 func main() {
354         logger := log.New(os.Stderr, "crunchstat: ", 0)
355         if err := run(logger); err != nil {
356                 if exiterr, ok := err.(*exec.ExitError); ok {
357                         // The program has exited with an exit code != 0
358
359                         // This works on both Unix and
360                         // Windows. Although package syscall is
361                         // generally platform dependent, WaitStatus is
362                         // defined for both Unix and Windows and in
363                         // both cases has an ExitStatus() method with
364                         // the same signature.
365                         if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
366                                 os.Exit(status.ExitStatus())
367                         }
368                 } else {
369                         logger.Fatalf("cmd.Wait: %v", err)
370                 }
371         }
372 }