Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / crunchstat / crunchstat.go
index 056ef0d185e61c7bbc52b692abd21ea61d9afdd4..3a473cab8715c49eec14d5e0565b61daf9d71a5e 100644 (file)
@@ -13,23 +13,16 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
+       "regexp"
+       "sort"
        "strconv"
        "strings"
+       "sync"
+       "syscall"
        "time"
 )
 
-// This magically allows us to look up userHz via _SC_CLK_TCK:
-
-/*
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
-#include <stdlib.h>
-*/
-import "C"
-
 // A Reporter gathers statistics for a cgroup and writes them to a
 // log.Logger.
 type Reporter struct {
@@ -52,13 +45,23 @@ type Reporter struct {
        // Interval between samples. Must be positive.
        PollPeriod time.Duration
 
+       // Temporary directory, will be monitored for available, used & total space.
+       TempDir string
+
        // Where to write statistics. Must not be nil.
-       Logger *log.Logger
+       Logger interface {
+               Printf(fmt string, args ...interface{})
+       }
 
-       reportedStatFile map[string]string
-       lastNetSample    map[string]ioSample
-       lastDiskSample   map[string]ioSample
-       lastCPUSample    cpuSample
+       kernelPageSize      int64
+       reportedStatFile    map[string]string
+       lastNetSample       map[string]ioSample
+       lastDiskIOSample    map[string]ioSample
+       lastCPUSample       cpuSample
+       lastDiskSpaceSample diskSpaceSample
+
+       reportPIDs   map[string]int
+       reportPIDsMu sync.Mutex
 
        done    chan struct{} // closed when we should stop reporting
        flushed chan struct{} // closed when we have made our last report
@@ -81,6 +84,17 @@ func (r *Reporter) Start() {
        go r.run()
 }
 
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       if r.reportPIDs == nil {
+               r.reportPIDs = map[string]int{name: pid}
+       } else {
+               r.reportPIDs[name] = pid
+       }
+}
+
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
@@ -216,14 +230,14 @@ func (r *Reporter) doBlkIOStats() {
                        continue
                }
                delta := ""
-               if prev, ok := r.lastDiskSample[dev]; ok {
+               if prev, ok := r.lastDiskIOSample[dev]; ok {
                        delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
                                sample.sampleTime.Sub(prev.sampleTime).Seconds(),
                                sample.txBytes-prev.txBytes,
                                sample.rxBytes-prev.rxBytes)
                }
                r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
-               r.lastDiskSample[dev] = sample
+               r.lastDiskIOSample[dev] = sample
        }
 }
 
@@ -251,11 +265,81 @@ func (r *Reporter) doMemoryStats() {
        }
        var outstat bytes.Buffer
        for _, key := range wantStats {
-               if val, ok := thisSample.memStat[key]; ok {
-                       outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+               // Use "total_X" stats (entire hierarchy) if enabled,
+               // otherwise just the single cgroup -- see
+               // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
+               if val, ok := thisSample.memStat["total_"+key]; ok {
+                       fmt.Fprintf(&outstat, " %d %s", val, key)
+               } else if val, ok := thisSample.memStat[key]; ok {
+                       fmt.Fprintf(&outstat, " %d %s", val, key)
                }
        }
        r.Logger.Printf("mem%s\n", outstat.String())
+
+       if r.kernelPageSize == 0 {
+               // assign "don't try again" value in case we give up
+               // and return without assigning the real value
+               r.kernelPageSize = -1
+               buf, err := os.ReadFile("/proc/self/smaps")
+               if err != nil {
+                       r.Logger.Printf("error reading /proc/self/smaps: %s", err)
+                       return
+               }
+               m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+               if len(m) != 2 {
+                       r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
+                       return
+               }
+               size, err := strconv.ParseInt(string(m[1]), 10, 64)
+               if err != nil {
+                       r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
+                       return
+               }
+               r.kernelPageSize = size * 1024
+       } else if r.kernelPageSize < 0 {
+               // already failed to determine page size, don't keep
+               // trying/logging
+               return
+       }
+
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       procnames := make([]string, 0, len(r.reportPIDs))
+       for name := range r.reportPIDs {
+               procnames = append(procnames, name)
+       }
+       sort.Strings(procnames)
+       procmem := ""
+       for _, procname := range procnames {
+               pid := r.reportPIDs[procname]
+               buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+               if err != nil {
+                       continue
+               }
+               // If the executable name contains a ')' char,
+               // /proc/$pid/stat will look like '1234 (exec name)) S
+               // 123 ...' -- the last ')' is the end of the 2nd
+               // field.
+               paren := bytes.LastIndexByte(buf, ')')
+               if paren < 0 {
+                       continue
+               }
+               fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+               if len(fields) < 24 {
+                       continue
+               }
+               // rss is the 24th field in .../stat, and fields[0]
+               // here is the last char ')' of the 2nd field, so
+               // rss is fields[22]
+               rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
+               if err != nil {
+                       continue
+               }
+               procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+       }
+       if procmem != "" {
+               r.Logger.Printf("procmem%s\n", procmem)
+       }
 }
 
 func (r *Reporter) doNetworkStats() {
@@ -302,6 +386,42 @@ func (r *Reporter) doNetworkStats() {
        }
 }
 
+type diskSpaceSample struct {
+       hasData    bool
+       sampleTime time.Time
+       total      uint64
+       used       uint64
+       available  uint64
+}
+
+func (r *Reporter) doDiskSpaceStats() {
+       s := syscall.Statfs_t{}
+       err := syscall.Statfs(r.TempDir, &s)
+       if err != nil {
+               return
+       }
+       bs := uint64(s.Bsize)
+       nextSample := diskSpaceSample{
+               hasData:    true,
+               sampleTime: time.Now(),
+               total:      s.Blocks * bs,
+               used:       (s.Blocks - s.Bfree) * bs,
+               available:  s.Bavail * bs,
+       }
+
+       var delta string
+       if r.lastDiskSpaceSample.hasData {
+               prev := r.lastDiskSpaceSample
+               interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
+               delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
+                       interval,
+                       int64(nextSample.used-prev.used))
+       }
+       r.Logger.Printf("statfs %d available %d used %d total%s\n",
+               nextSample.available, nextSample.used, nextSample.total, delta)
+       r.lastDiskSpaceSample = nextSample
+}
+
 type cpuSample struct {
        hasData    bool // to distinguish the zero value from real data
        sampleTime time.Time
@@ -349,7 +469,7 @@ func (r *Reporter) doCPUStats() {
 
        var userTicks, sysTicks int64
        fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
-       userHz := float64(C.sysconf(C._SC_CLK_TCK))
+       userHz := float64(100)
        nextSample := cpuSample{
                hasData:    true,
                sampleTime: time.Now(),
@@ -382,7 +502,15 @@ func (r *Reporter) run() {
        }
 
        r.lastNetSample = make(map[string]ioSample)
-       r.lastDiskSample = make(map[string]ioSample)
+       r.lastDiskIOSample = make(map[string]ioSample)
+
+       if len(r.TempDir) == 0 {
+               // Temporary dir not provided, try to get it from the environment.
+               r.TempDir = os.Getenv("TMPDIR")
+       }
+       if len(r.TempDir) > 0 {
+               r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
+       }
 
        ticker := time.NewTicker(r.PollPeriod)
        for {
@@ -390,6 +518,7 @@ func (r *Reporter) run() {
                r.doCPUStats()
                r.doBlkIOStats()
                r.doNetworkStats()
+               r.doDiskSpaceStats()
                select {
                case <-r.done:
                        return