+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
// Package crunchstat reports resource usage (CPU, memory, disk,
// network) for a cgroup.
package crunchstat
"fmt"
"io"
"io/ioutil"
- "log"
"os"
+ "regexp"
+ "sort"
"strconv"
"strings"
+ "sync"
+ "syscall"
"time"
)
-// This magically allows us to look up userHz via _SC_CLK_TCK:
-
-/*
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
-#include <stdlib.h>
-*/
-import "C"
-
// A Reporter gathers statistics for a cgroup and writes them to a
// log.Logger.
type Reporter struct {
// Interval between samples. Must be positive.
PollPeriod time.Duration
+ // Temporary directory, will be monitored for available, used & total space.
+ TempDir string
+
// Where to write statistics. Must not be nil.
- Logger *log.Logger
+ Logger interface {
+ Printf(fmt string, args ...interface{})
+ }
+
+ kernelPageSize int64
+ reportedStatFile map[string]string
+ lastNetSample map[string]ioSample
+ lastDiskIOSample map[string]ioSample
+ lastCPUSample cpuSample
+ lastDiskSpaceSample diskSpaceSample
- reportedStatFile map[string]string
- lastNetSample map[string]ioSample
- lastDiskSample map[string]ioSample
- lastCPUSample cpuSample
+ reportPIDs map[string]int
+ reportPIDsMu sync.Mutex
- done chan struct{}
+ done chan struct{} // closed when we should stop reporting
+ flushed chan struct{} // closed when we have made our last report
}
// Start starts monitoring in a new goroutine, and returns
// Callers should not modify public data fields after calling Start.
func (r *Reporter) Start() {
r.done = make(chan struct{})
+ r.flushed = make(chan struct{})
go r.run()
}
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ if r.reportPIDs == nil {
+ r.reportPIDs = map[string]int{name: pid}
+ } else {
+ r.reportPIDs[name] = pid
+ }
+}
+
// Stop reporting. Do not call more than once, or before calling
// Start.
//
// Nothing will be logged after Stop returns.
func (r *Reporter) Stop() {
close(r.done)
+ <-r.flushed
}
func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
- r.Logger.Print(err)
+ r.Logger.Printf("warning: %v", err)
}
return content, err
}
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- r.Logger.Print(err)
+ r.Logger.Printf("notice: %v", err)
continue
}
return strings.NewReader(string(stats)), nil
continue
}
delta := ""
- if prev, ok := r.lastDiskSample[dev]; ok {
+ if prev, ok := r.lastDiskIOSample[dev]; ok {
delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
sample.sampleTime.Sub(prev.sampleTime).Seconds(),
sample.txBytes-prev.txBytes,
sample.rxBytes-prev.rxBytes)
}
r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
- r.lastDiskSample[dev] = sample
+ r.lastDiskIOSample[dev] = sample
}
}
}
var outstat bytes.Buffer
for _, key := range wantStats {
- if val, ok := thisSample.memStat[key]; ok {
- outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+ // Use "total_X" stats (entire hierarchy) if enabled,
+ // otherwise just the single cgroup -- see
+ // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
+ if val, ok := thisSample.memStat["total_"+key]; ok {
+ fmt.Fprintf(&outstat, " %d %s", val, key)
+ } else if val, ok := thisSample.memStat[key]; ok {
+ fmt.Fprintf(&outstat, " %d %s", val, key)
}
}
r.Logger.Printf("mem%s\n", outstat.String())
+
+ if r.kernelPageSize == 0 {
+ // assign "don't try again" value in case we give up
+ // and return without assigning the real value
+ r.kernelPageSize = -1
+ buf, err := os.ReadFile("/proc/self/smaps")
+ if err != nil {
+ r.Logger.Printf("error reading /proc/self/smaps: %s", err)
+ return
+ }
+ m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+ if len(m) != 2 {
+ r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
+ return
+ }
+ size, err := strconv.ParseInt(string(m[1]), 10, 64)
+ if err != nil {
+ r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
+ return
+ }
+ r.kernelPageSize = size * 1024
+ } else if r.kernelPageSize < 0 {
+ // already failed to determine page size, don't keep
+ // trying/logging
+ return
+ }
+
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ procnames := make([]string, 0, len(r.reportPIDs))
+ for name := range r.reportPIDs {
+ procnames = append(procnames, name)
+ }
+ sort.Strings(procnames)
+ procmem := ""
+ for _, procname := range procnames {
+ pid := r.reportPIDs[procname]
+ buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+ if err != nil {
+ continue
+ }
+ // If the executable name contains a ')' char,
+ // /proc/$pid/stat will look like '1234 (exec name)) S
+ // 123 ...' -- the last ')' is the end of the 2nd
+ // field.
+ paren := bytes.LastIndexByte(buf, ')')
+ if paren < 0 {
+ continue
+ }
+ fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+ if len(fields) < 24 {
+ continue
+ }
+ // rss is the 24th field in .../stat, and fields[0]
+ // here is the last char ')' of the 2nd field, so
+ // rss is fields[22]
+ rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
+ if err != nil {
+ continue
+ }
+ procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+ }
+ if procmem != "" {
+ r.Logger.Printf("procmem%s\n", procmem)
+ }
}
func (r *Reporter) doNetworkStats() {
}
}
+type diskSpaceSample struct {
+ hasData bool
+ sampleTime time.Time
+ total uint64
+ used uint64
+ available uint64
+}
+
+func (r *Reporter) doDiskSpaceStats() {
+ s := syscall.Statfs_t{}
+ err := syscall.Statfs(r.TempDir, &s)
+ if err != nil {
+ return
+ }
+ bs := uint64(s.Bsize)
+ nextSample := diskSpaceSample{
+ hasData: true,
+ sampleTime: time.Now(),
+ total: s.Blocks * bs,
+ used: (s.Blocks - s.Bfree) * bs,
+ available: s.Bavail * bs,
+ }
+
+ var delta string
+ if r.lastDiskSpaceSample.hasData {
+ prev := r.lastDiskSpaceSample
+ interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
+ delta = fmt.Sprintf(" -- interval %.4f seconds %d used",
+ interval,
+ int64(nextSample.used-prev.used))
+ }
+ r.Logger.Printf("statfs %d available %d used %d total%s\n",
+ nextSample.available, nextSample.used, nextSample.total, delta)
+ r.lastDiskSpaceSample = nextSample
+}
+
type cpuSample struct {
hasData bool // to distinguish the zero value from real data
sampleTime time.Time
var userTicks, sysTicks int64
fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
- userHz := float64(C.sysconf(C._SC_CLK_TCK))
+ userHz := float64(100)
nextSample := cpuSample{
hasData: true,
sampleTime: time.Now(),
// Report stats periodically until we learn (via r.done) that someone
// called Stop.
func (r *Reporter) run() {
+ defer close(r.flushed)
+
r.reportedStatFile = make(map[string]string)
if !r.waitForCIDFile() || !r.waitForCgroup() {
}
r.lastNetSample = make(map[string]ioSample)
- r.lastDiskSample = make(map[string]ioSample)
+ r.lastDiskIOSample = make(map[string]ioSample)
+
+ if len(r.TempDir) == 0 {
+ // Temporary dir not provided, try to get it from the environment.
+ r.TempDir = os.Getenv("TMPDIR")
+ }
+ if len(r.TempDir) > 0 {
+ r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
+ }
ticker := time.NewTicker(r.PollPeriod)
for {
r.doCPUStats()
r.doBlkIOStats()
r.doNetworkStats()
+ r.doDiskSpaceStats()
select {
case <-r.done:
return
select {
case <-ticker.C:
case <-r.done:
- r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+ r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
return false
}
}
select {
case <-ticker.C:
case <-warningTimer:
- r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+ r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
case <-r.done:
- r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+ r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
return false
}
}