3826: Fix up block IO stats.
authorTom Clegg <tom@curoverse.com>
Tue, 14 Oct 2014 18:44:01 +0000 (14:44 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 14 Oct 2014 18:44:01 +0000 (14:44 -0400)
services/crunchstat/crunchstat.go

index 80bf62dad2839c941efb411536179475b8530c88..9590bbbf5c42a6e0fa8b4b6032a1ae9715d5daa7 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bufio"
+       "bytes"
        "flag"
        "errors"
        "fmt"
@@ -84,9 +85,9 @@ func FindStat(stderr chan<- string, cgroup Cgroup, statgroup string, stat string
                path = fmt.Sprintf("%s/%s", cgroup.root, stat)
        }
        if _, err := os.Stat(path); err != nil {
-               if _, ok := reportedNoStatFile[path]; !ok {
+               if _, ok := reportedNoStatFile[stat]; !ok {
                        stderr <- fmt.Sprintf("crunchstat: did not find stats file (root %s, parent %s, cid %s, statgroup %s, stat %s)", cgroup.root, cgroup.parent, cgroup.cid, statgroup, stat)
-                       reportedNoStatFile[path] = true
+                       reportedNoStatFile[stat] = true
                }
                return ""
        }
@@ -118,55 +119,69 @@ func GetContainerNetStats(stderr chan<- string, cgroup Cgroup) (io.Reader, error
        return nil, errors.New("Could not read stats for any proc in container")
 }
 
-type Disk struct {
-       last_read  int64
-       next_read  int64
-       last_write int64
-       next_write int64
+type IoSample struct {
+       sampleTime time.Time
+       txBytes    int64
+       rxBytes    int64
 }
-var disk map[string]*Disk
 
-func DoBlkIoStats(stderr chan<- string, cgroup Cgroup) {
+func DoBlkIoStats(stderr chan<- string, cgroup Cgroup, lastSample map[string]IoSample) (map[string]IoSample) {
        blkio_io_service_bytes := FindStat(stderr, cgroup, "blkio", "blkio.io_service_bytes")
        if blkio_io_service_bytes == "" {
-               return
-       }
-
-       if disk == nil {
-               disk = make(map[string]*Disk)
+               return lastSample
        }
 
        c, err := os.Open(blkio_io_service_bytes)
        if err != nil {
-               stderr <- fmt.Sprintf("open %s: %s", blkio_io_service_bytes, err)
-               return
+               stderr <- fmt.Sprintf("crunchstat: open %s: %s", blkio_io_service_bytes, err)
+               return lastSample
        }
        defer c.Close()
        b := bufio.NewScanner(c)
-       var device, op string
-       var next int64
+       var sampleTime = time.Now()
+       newSamples := make(map[string]IoSample)
        for b.Scan() {
-               if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &next); err != nil {
+               var device, op string
+               var val int64
+               if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
                        continue
                }
-               if disk[device] == nil {
-                       disk[device] = new(Disk)
+               var thisSample IoSample
+               var ok bool
+               if thisSample, ok = newSamples[device]; !ok {
+                       thisSample = IoSample{sampleTime, -1, -1}
                }
-               if op == "Read" {
-                       disk[device].last_read = disk[device].next_read
-                       disk[device].next_read = next
-                       if disk[device].last_read > 0 && (disk[device].next_read != disk[device].last_read) {
-                               stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s read %v", device, disk[device].next_read-disk[device].last_read)
-                       }
+               switch op {
+               case "Read":
+                       thisSample.rxBytes = val
+               case "Write":
+                       thisSample.txBytes = val
                }
-               if op == "Write" {
-                       disk[device].last_write = disk[device].next_write
-                       disk[device].next_write = next
-                       if disk[device].last_write > 0 && (disk[device].next_write != disk[device].last_write) {
-                               stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s write %v", device, disk[device].next_write-disk[device].last_write)
-                       }
+               newSamples[device] = thisSample
+       }
+       if lastSample == nil {
+               lastSample = make(map[string]IoSample)
+       }
+       for dev, sample := range newSamples {
+               if sample.txBytes < 0 || sample.rxBytes < 0 {
+                       continue
+               }
+               delta := ""
+               if prev, ok := lastSample[dev]; ok {
+                       delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
+                               sample.sampleTime.Sub(prev.sampleTime).Seconds(),
+                               sample.txBytes - prev.txBytes,
+                               sample.rxBytes - prev.rxBytes)
                }
+               stderr <- fmt.Sprintf("crunchstat: blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
+               lastSample[dev] = sample
        }
+       return lastSample
+}
+
+type MemSample struct {
+       sampleTime time.Time
+       memStat    map[string]int64
 }
 
 func DoMemoryStats(stderr chan<- string, cgroup Cgroup) {
@@ -176,36 +191,37 @@ func DoMemoryStats(stderr chan<- string, cgroup Cgroup) {
        }
        c, err := os.Open(memory_stat)
        if err != nil {
-               stderr <- fmt.Sprintf("open %s: %s", memory_stat, err)
+               stderr <- fmt.Sprintf("crunchstat: open %s: %s", memory_stat, err)
                return
        }
        defer c.Close()
        b := bufio.NewScanner(c)
-       var stat string
-       var val int64
+       thisSample := MemSample{time.Now(), make(map[string]int64)}
+       wantStats := [...]string{"cache", "pgmajfault", "rss"}
        for b.Scan() {
+               var stat string
+               var val int64
                if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
                        continue
                }
-               if stat == "rss" {
-                       stderr <- fmt.Sprintf("crunchstat: mem %d rss", val)
+               thisSample.memStat[stat] = val
+       }
+       var outstat bytes.Buffer
+       for _, key := range wantStats {
+               if val, ok := thisSample.memStat[key]; ok {
+                       outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
                }
        }
+       stderr <- fmt.Sprintf("crunchstat: mem%s", outstat.String())
 }
 
-type NetSample struct {
-       sampleTime time.Time
-       txBytes    int64
-       rxBytes    int64
-}
-
-func DoNetworkStats(stderr chan<- string, cgroup Cgroup, lastStat map[string]NetSample) (map[string]NetSample) {
+func DoNetworkStats(stderr chan<- string, cgroup Cgroup, lastSample map[string]IoSample) (map[string]IoSample) {
        sampleTime := time.Now()
        stats, err := GetContainerNetStats(stderr, cgroup)
-       if err != nil { return lastStat }
+       if err != nil { return lastSample }
 
-       if lastStat == nil {
-               lastStat = make(map[string]NetSample)
+       if lastSample == nil {
+               lastSample = make(map[string]IoSample)
        }
        scanner := bufio.NewScanner(stats)
        Iface: for scanner.Scan() {
@@ -234,12 +250,12 @@ func DoNetworkStats(stderr chan<- string, cgroup Cgroup, lastStat map[string]Net
                        // Skip loopback interface and lines with wrong format
                        continue
                }
-               nextSample := NetSample{}
+               nextSample := IoSample{}
                nextSample.sampleTime = sampleTime
                nextSample.txBytes = tx
                nextSample.rxBytes = rx
                var delta string
-               if lastSample, ok := lastStat[ifName]; ok {
+               if lastSample, ok := lastSample[ifName]; ok {
                        interval := nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds()
                        delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
                                interval,
@@ -248,9 +264,9 @@ func DoNetworkStats(stderr chan<- string, cgroup Cgroup, lastStat map[string]Net
                }
                stderr <- fmt.Sprintf("crunchstat: net:%s %d tx %d rx%s",
                        ifName, tx, rx, delta)
-               lastStat[ifName] = nextSample
+               lastSample[ifName] = nextSample
        }
-       return lastStat
+       return lastSample
 }
 
 func PollCgroupStats(cgroup Cgroup, stderr chan string, poll int64, stop_poll_chan <-chan bool) {
@@ -260,7 +276,8 @@ func PollCgroupStats(cgroup Cgroup, stderr chan string, poll int64, stop_poll_ch
 
        user_hz := float64(C.sysconf(C._SC_CLK_TCK))
 
-       var lastNetStat map[string]NetSample = nil
+       var lastNetSample map[string]IoSample = nil
+       var lastDiskSample map[string]IoSample = nil
 
        poll_chan := make(chan bool, 1)
        go func() {
@@ -333,9 +350,9 @@ func PollCgroupStats(cgroup Cgroup, stderr chan string, poll int64, stop_poll_ch
                        last_sys = next_sys
                }
 
-               DoBlkIoStats(stderr, cgroup)
                DoMemoryStats(stderr, cgroup)
-               lastNetStat = DoNetworkStats(stderr, cgroup, lastNetStat)
+               lastDiskSample = DoBlkIoStats(stderr, cgroup, lastDiskSample)
+               lastNetSample = DoNetworkStats(stderr, cgroup, lastNetSample)
        }
 }