Merge branch '8784-dir-listings'
[arvados.git] / lib / crunchstat / crunchstat.go
index 8d7621c2dc07f7258efdc53e53a8beecf4769015..f4915c0e3e9f8e34a61ba20d488bd8edd3428190 100644 (file)
@@ -1,3 +1,9 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package crunchstat reports resource usage (CPU, memory, disk,
+// network) for a cgroup.
 package crunchstat
 
 import (
@@ -14,7 +20,7 @@ import (
        "time"
 )
 
-// This magically allows us to look up user_hz via _SC_CLK_TCK:
+// This magically allows us to look up userHz via _SC_CLK_TCK:
 
 /*
 #include <unistd.h>
@@ -28,49 +34,63 @@ import "C"
 // log.Logger.
 type Reporter struct {
        // CID of the container to monitor. If empty, read the CID
-       // from CIDFile.
+       // from CIDFile (first waiting until a non-empty file appears
+       // at CIDFile). If CIDFile is also empty, report host
+       // statistics.
        CID string
-       // Where cgroup special files live on this system
-       CgroupRoot   string
-       CgroupParent string
-       // Path to a file we can read CID from. If CIDFile is empty or
-       // nonexistent, wait for it to appear.
+
+       // Path to a file we can read CID from.
        CIDFile string
 
-       // Interval between samples
-       Poll time.Duration
+       // Where cgroup accounting files live on this system, e.g.,
+       // "/sys/fs/cgroup".
+       CgroupRoot string
 
-       // Where to write statistics.
+       // Parent cgroup, e.g., "docker".
+       CgroupParent string
+
+       // Interval between samples. Must be positive.
+       PollPeriod time.Duration
+
+       // Where to write statistics. Must not be nil.
        Logger *log.Logger
 
        reportedStatFile map[string]string
-       lastNetSample    map[string]IoSample
-       lastDiskSample   map[string]IoSample
-       lastCPUSample    CpuSample
+       lastNetSample    map[string]ioSample
+       lastDiskSample   map[string]ioSample
+       lastCPUSample    cpuSample
 
-       done chan struct{}
+       done    chan struct{} // closed when we should stop reporting
+       flushed chan struct{} // closed when we have made our last report
 }
 
-// Wait (if necessary) for the CID to appear in CIDFile, then start
-// reporting statistics.
+// Start starts monitoring in a new goroutine, and returns
+// immediately.
 //
-// Start should not be called more than once on a Reporter.
+// The monitoring goroutine waits for a non-empty CIDFile to appear
+// (unless CID is non-empty). Then it waits for the accounting files
+// to appear for the monitored container. Then it collects and reports
+// statistics until Stop is called.
 //
-// Public data fields should not be changed after calling Start.
+// Callers should not call Start more than once.
+//
+// Callers should not modify public data fields after calling Start.
 func (r *Reporter) Start() {
        r.done = make(chan struct{})
+       r.flushed = make(chan struct{})
        go r.run()
 }
 
-// Stop reporting statistics. Do not call more than once, or before
-// calling Start.
+// Stop reporting. Do not call more than once, or before calling
+// Start.
 //
 // Nothing will be logged after Stop returns.
 func (r *Reporter) Stop() {
        close(r.done)
+       <-r.flushed
 }
 
-func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
+func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
                r.Logger.Print(err)
@@ -79,14 +99,20 @@ func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
 }
 
 // Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
+// cgroup, and return an io.ReadCloser. If no stats file is available,
 // return nil.
 //
+// Log the file that was opened, if it isn't the same file opened on
+// the last openStatFile for this stat.
+//
+// Log "not available" if no file is found and either this stat has
+// been available in the past, or verbose==true.
+//
 // TODO: Instead of trying all options, choose a process in the
 // container, and read /proc/PID/cgroup to determine the appropriate
 // cgroup root for the given statgroup. (This will avoid falling back
 // to host-level stats during container setup and teardown.)
-func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error) {
+func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
        var paths []string
        if r.CID != "" {
                // Collect container's stats
@@ -112,16 +138,16 @@ func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error)
                        path = ""
                }
        }
-       if pathWas, ok := r.reportedStatFile[stat]; !ok || pathWas != path {
+       if pathWas := r.reportedStatFile[stat]; pathWas != path {
                // Log whenever we start using a new/different cgroup
                // stat file for a given statistic. This typically
                // happens 1 to 3 times per statistic, depending on
                // whether we happen to collect stats [a] before any
                // processes have been created in the container and
                // [b] after all contained processes have exited.
-               if path == "" {
+               if path == "" && verbose {
                        r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
-               } else if ok {
+               } else if pathWas != "" {
                        r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
                } else {
                        r.Logger.Printf("notice: reading stats from %s\n", path)
@@ -132,7 +158,7 @@ func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error)
 }
 
 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
-       procsFile, err := r.openStatFile("cpuacct", "cgroup.procs")
+       procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
        if err != nil {
                return nil, err
        }
@@ -151,31 +177,31 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) {
        return nil, errors.New("Could not read stats for any proc in container")
 }
 
-type IoSample struct {
+type ioSample struct {
        sampleTime time.Time
        txBytes    int64
        rxBytes    int64
 }
 
-func (r *Reporter) DoBlkIoStats() {
-       c, err := r.openStatFile("blkio", "blkio.io_service_bytes")
+func (r *Reporter) doBlkIOStats() {
+       c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
        if err != nil {
                return
        }
        defer c.Close()
        b := bufio.NewScanner(c)
        var sampleTime = time.Now()
-       newSamples := make(map[string]IoSample)
+       newSamples := make(map[string]ioSample)
        for b.Scan() {
                var device, op string
                var val int64
                if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
                        continue
                }
-               var thisSample IoSample
+               var thisSample ioSample
                var ok bool
                if thisSample, ok = newSamples[device]; !ok {
-                       thisSample = IoSample{sampleTime, -1, -1}
+                       thisSample = ioSample{sampleTime, -1, -1}
                }
                switch op {
                case "Read":
@@ -201,19 +227,19 @@ func (r *Reporter) DoBlkIoStats() {
        }
 }
 
-type MemSample struct {
+type memSample struct {
        sampleTime time.Time
        memStat    map[string]int64
 }
 
-func (r *Reporter) DoMemoryStats() {
-       c, err := r.openStatFile("memory", "memory.stat")
+func (r *Reporter) doMemoryStats() {
+       c, err := r.openStatFile("memory", "memory.stat", true)
        if err != nil {
                return
        }
        defer c.Close()
        b := bufio.NewScanner(c)
-       thisSample := MemSample{time.Now(), make(map[string]int64)}
+       thisSample := memSample{time.Now(), make(map[string]int64)}
        wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
        for b.Scan() {
                var stat string
@@ -232,7 +258,7 @@ func (r *Reporter) DoMemoryStats() {
        r.Logger.Printf("mem%s\n", outstat.String())
 }
 
-func (r *Reporter) DoNetworkStats() {
+func (r *Reporter) doNetworkStats() {
        sampleTime := time.Now()
        stats, err := r.getContainerNetStats()
        if err != nil {
@@ -259,7 +285,7 @@ func (r *Reporter) DoNetworkStats() {
                if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
                        continue
                }
-               nextSample := IoSample{}
+               nextSample := ioSample{}
                nextSample.sampleTime = sampleTime
                nextSample.txBytes = tx
                nextSample.rxBytes = rx
@@ -276,7 +302,7 @@ func (r *Reporter) DoNetworkStats() {
        }
 }
 
-type CpuSample struct {
+type cpuSample struct {
        hasData    bool // to distinguish the zero value from real data
        sampleTime time.Time
        user       float64
@@ -286,13 +312,16 @@ type CpuSample struct {
 
 // Return the number of CPUs available in the container. Return 0 if
 // we can't figure out the real number of CPUs.
-func (r *Reporter) GetCpuCount() int64 {
-       cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus")
+func (r *Reporter) getCPUCount() int64 {
+       cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
        if err != nil {
                return 0
        }
        defer cpusetFile.Close()
        b, err := r.readAllOrWarn(cpusetFile)
+       if err != nil {
+               return 0
+       }
        sp := strings.Split(string(b), ",")
        cpus := int64(0)
        for _, v := range sp {
@@ -301,14 +330,14 @@ func (r *Reporter) GetCpuCount() int64 {
                if n == 2 {
                        cpus += (max - min) + 1
                } else {
-                       cpus += 1
+                       cpus++
                }
        }
        return cpus
 }
 
-func (r *Reporter) DoCpuStats() {
-       statFile, err := r.openStatFile("cpuacct", "cpuacct.stat")
+func (r *Reporter) doCPUStats() {
+       statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
        if err != nil {
                return
        }
@@ -318,12 +347,16 @@ func (r *Reporter) DoCpuStats() {
                return
        }
 
-       nextSample := CpuSample{true, time.Now(), 0, 0, r.GetCpuCount()}
        var userTicks, sysTicks int64
        fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
-       user_hz := float64(C.sysconf(C._SC_CLK_TCK))
-       nextSample.user = float64(userTicks) / user_hz
-       nextSample.sys = float64(sysTicks) / user_hz
+       userHz := float64(C.sysconf(C._SC_CLK_TCK))
+       nextSample := cpuSample{
+               hasData:    true,
+               sampleTime: time.Now(),
+               user:       float64(userTicks) / userHz,
+               sys:        float64(sysTicks) / userHz,
+               cpus:       r.getCPUCount(),
+       }
 
        delta := ""
        if r.lastCPUSample.hasData {
@@ -337,22 +370,26 @@ func (r *Reporter) DoCpuStats() {
        r.lastCPUSample = nextSample
 }
 
-// Report stats periodically until someone closes or sends to r.done.
+// Report stats periodically until we learn (via r.done) that someone
+// called Stop.
 func (r *Reporter) run() {
-       if !r.waitForCIDFile() {
+       defer close(r.flushed)
+
+       r.reportedStatFile = make(map[string]string)
+
+       if !r.waitForCIDFile() || !r.waitForCgroup() {
                return
        }
 
-       r.reportedStatFile = make(map[string]string)
-       r.lastNetSample = make(map[string]IoSample)
-       r.lastDiskSample = make(map[string]IoSample)
+       r.lastNetSample = make(map[string]ioSample)
+       r.lastDiskSample = make(map[string]ioSample)
 
-       ticker := time.NewTicker(r.Poll)
+       ticker := time.NewTicker(r.PollPeriod)
        for {
-               r.DoMemoryStats()
-               r.DoCpuStats()
-               r.DoBlkIoStats()
-               r.DoNetworkStats()
+               r.doMemoryStats()
+               r.doCPUStats()
+               r.doBlkIOStats()
+               r.doNetworkStats()
                select {
                case <-r.done:
                        return
@@ -362,9 +399,9 @@ func (r *Reporter) run() {
 }
 
 // If CID is empty, wait for it to appear in CIDFile. Return true if
-// we get it before someone calls Stop().
+// we get it before we learn (via r.done) that someone called Stop.
 func (r *Reporter) waitForCIDFile() bool {
-       if r.CID != "" {
+       if r.CID != "" || r.CIDFile == "" {
                return true
        }
 
@@ -384,3 +421,28 @@ func (r *Reporter) waitForCIDFile() bool {
                }
        }
 }
+
+// Wait for the cgroup stats files to appear in cgroup_root. Return
+// true if they appear before r.done indicates someone called Stop. If
+// they don't appear within one poll interval, log a warning and keep
+// waiting.
+func (r *Reporter) waitForCgroup() bool {
+       ticker := time.NewTicker(100 * time.Millisecond)
+       defer ticker.Stop()
+       warningTimer := time.After(r.PollPeriod)
+       for {
+               c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
+               if err == nil {
+                       c.Close()
+                       return true
+               }
+               select {
+               case <-ticker.C:
+               case <-warningTimer:
+                       r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+               case <-r.done:
+                       r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+                       return false
+               }
+       }
+}