X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b7b9ea44ada30b1251fb10c872cb1da1d7c29bd0..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/lib/crunchstat/crunchstat.go diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index 03cfa7d3ef..3a473cab87 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + // Package crunchstat reports resource usage (CPU, memory, disk, // network) for a cgroup. package crunchstat @@ -9,23 +13,16 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" + "regexp" + "sort" "strconv" "strings" + "sync" + "syscall" "time" ) -// This magically allows us to look up userHz via _SC_CLK_TCK: - -/* -#include -#include -#include -#include -*/ -import "C" - // A Reporter gathers statistics for a cgroup and writes them to a // log.Logger. type Reporter struct { @@ -48,15 +45,26 @@ type Reporter struct { // Interval between samples. Must be positive. PollPeriod time.Duration + // Temporary directory, will be monitored for available, used & total space. + TempDir string + // Where to write statistics. Must not be nil. - Logger *log.Logger + Logger interface { + Printf(fmt string, args ...interface{}) + } + + kernelPageSize int64 + reportedStatFile map[string]string + lastNetSample map[string]ioSample + lastDiskIOSample map[string]ioSample + lastCPUSample cpuSample + lastDiskSpaceSample diskSpaceSample - reportedStatFile map[string]string - lastNetSample map[string]ioSample - lastDiskSample map[string]ioSample - lastCPUSample cpuSample + reportPIDs map[string]int + reportPIDsMu sync.Mutex - done chan struct{} + done chan struct{} // closed when we should stop reporting + flushed chan struct{} // closed when we have made our last report } // Start starts monitoring in a new goroutine, and returns @@ -72,21 +80,34 @@ type Reporter struct { // Callers should not modify public data fields after calling Start. func (r *Reporter) Start() { r.done = make(chan struct{}) + r.flushed = make(chan struct{}) go r.run() } +// ReportPID starts reporting stats for a specified process. +func (r *Reporter) ReportPID(name string, pid int) { + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + if r.reportPIDs == nil { + r.reportPIDs = map[string]int{name: pid} + } else { + r.reportPIDs[name] = pid + } +} + // Stop reporting. Do not call more than once, or before calling // Start. // // Nothing will be logged after Stop returns. func (r *Reporter) Stop() { close(r.done) + <-r.flushed } func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) { content, err := ioutil.ReadAll(in) if err != nil { - r.Logger.Print(err) + r.Logger.Printf("warning: %v", err) } return content, err } @@ -162,7 +183,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) { statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid) stats, err := ioutil.ReadFile(statsFilename) if err != nil { - r.Logger.Print(err) + r.Logger.Printf("notice: %v", err) continue } return strings.NewReader(string(stats)), nil @@ -209,14 +230,14 @@ func (r *Reporter) doBlkIOStats() { continue } delta := "" - if prev, ok := r.lastDiskSample[dev]; ok { + if prev, ok := r.lastDiskIOSample[dev]; ok { delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read", sample.sampleTime.Sub(prev.sampleTime).Seconds(), sample.txBytes-prev.txBytes, sample.rxBytes-prev.rxBytes) } r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta) - r.lastDiskSample[dev] = sample + r.lastDiskIOSample[dev] = sample } } @@ -244,11 +265,81 @@ func (r *Reporter) doMemoryStats() { } var outstat bytes.Buffer for _, key := range wantStats { - if val, ok := thisSample.memStat[key]; ok { - outstat.WriteString(fmt.Sprintf(" %d %s", val, key)) + // Use "total_X" stats (entire hierarchy) if enabled, + // otherwise just the single cgroup -- see + // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + if val, ok := thisSample.memStat["total_"+key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) + } else if val, ok := thisSample.memStat[key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) } } r.Logger.Printf("mem%s\n", outstat.String()) + + if r.kernelPageSize == 0 { + // assign "don't try again" value in case we give up + // and return without assigning the real value + r.kernelPageSize = -1 + buf, err := os.ReadFile("/proc/self/smaps") + if err != nil { + r.Logger.Printf("error reading /proc/self/smaps: %s", err) + return + } + m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf) + if len(m) != 2 { + r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found") + return + } + size, err := strconv.ParseInt(string(m[1]), 10, 64) + if err != nil { + r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err) + return + } + r.kernelPageSize = size * 1024 + } else if r.kernelPageSize < 0 { + // already failed to determine page size, don't keep + // trying/logging + return + } + + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + procnames := make([]string, 0, len(r.reportPIDs)) + for name := range r.reportPIDs { + procnames = append(procnames, name) + } + sort.Strings(procnames) + procmem := "" + for _, procname := range procnames { + pid := r.reportPIDs[procname] + buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)) + if err != nil { + continue + } + // If the executable name contains a ')' char, + // /proc/$pid/stat will look like '1234 (exec name)) S + // 123 ...' -- the last ')' is the end of the 2nd + // field. + paren := bytes.LastIndexByte(buf, ')') + if paren < 0 { + continue + } + fields := bytes.SplitN(buf[paren:], []byte{' '}, 24) + if len(fields) < 24 { + continue + } + // rss is the 24th field in .../stat, and fields[0] + // here is the last char ')' of the 2nd field, so + // rss is fields[22] + rss, err := strconv.ParseInt(string(fields[22]), 10, 64) + if err != nil { + continue + } + procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname) + } + if procmem != "" { + r.Logger.Printf("procmem%s\n", procmem) + } } func (r *Reporter) doNetworkStats() { @@ -295,6 +386,42 @@ func (r *Reporter) doNetworkStats() { } } +type diskSpaceSample struct { + hasData bool + sampleTime time.Time + total uint64 + used uint64 + available uint64 +} + +func (r *Reporter) doDiskSpaceStats() { + s := syscall.Statfs_t{} + err := syscall.Statfs(r.TempDir, &s) + if err != nil { + return + } + bs := uint64(s.Bsize) + nextSample := diskSpaceSample{ + hasData: true, + sampleTime: time.Now(), + total: s.Blocks * bs, + used: (s.Blocks - s.Bfree) * bs, + available: s.Bavail * bs, + } + + var delta string + if r.lastDiskSpaceSample.hasData { + prev := r.lastDiskSpaceSample + interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds() + delta = fmt.Sprintf(" -- interval %.4f seconds %d used", + interval, + int64(nextSample.used-prev.used)) + } + r.Logger.Printf("statfs %d available %d used %d total%s\n", + nextSample.available, nextSample.used, nextSample.total, delta) + r.lastDiskSpaceSample = nextSample +} + type cpuSample struct { hasData bool // to distinguish the zero value from real data sampleTime time.Time @@ -342,7 +469,7 @@ func (r *Reporter) doCPUStats() { var userTicks, sysTicks int64 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks) - userHz := float64(C.sysconf(C._SC_CLK_TCK)) + userHz := float64(100) nextSample := cpuSample{ hasData: true, sampleTime: time.Now(), @@ -366,6 +493,8 @@ func (r *Reporter) doCPUStats() { // Report stats periodically until we learn (via r.done) that someone // called Stop. func (r *Reporter) run() { + defer close(r.flushed) + r.reportedStatFile = make(map[string]string) if !r.waitForCIDFile() || !r.waitForCgroup() { @@ -373,7 +502,15 @@ func (r *Reporter) run() { } r.lastNetSample = make(map[string]ioSample) - r.lastDiskSample = make(map[string]ioSample) + r.lastDiskIOSample = make(map[string]ioSample) + + if len(r.TempDir) == 0 { + // Temporary dir not provided, try to get it from the environment. + r.TempDir = os.Getenv("TMPDIR") + } + if len(r.TempDir) > 0 { + r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir) + } ticker := time.NewTicker(r.PollPeriod) for { @@ -381,6 +518,7 @@ func (r *Reporter) run() { r.doCPUStats() r.doBlkIOStats() r.doNetworkStats() + r.doDiskSpaceStats() select { case <-r.done: return @@ -407,7 +545,7 @@ func (r *Reporter) waitForCIDFile() bool { select { case <-ticker.C: case <-r.done: - r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err) + r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err) return false } } @@ -430,9 +568,9 @@ func (r *Reporter) waitForCgroup() bool { select { case <-ticker.C: case <-warningTimer: - r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod) + r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod) case <-r.done: - r.Logger.Printf("cgroup stats files never appeared for %v", r.CID) + r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID) return false } }