X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8fdac0b6f954b6265798390b95e61f8192b85630..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/lib/crunchstat/crunchstat.go diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index 8afe828196..3a473cab87 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -13,24 +13,16 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" + "regexp" + "sort" "strconv" "strings" + "sync" "syscall" "time" ) -// This magically allows us to look up userHz via _SC_CLK_TCK: - -/* -#include -#include -#include -#include -*/ -import "C" - // A Reporter gathers statistics for a cgroup and writes them to a // log.Logger. type Reporter struct { @@ -57,14 +49,20 @@ type Reporter struct { TempDir string // Where to write statistics. Must not be nil. - Logger *log.Logger + Logger interface { + Printf(fmt string, args ...interface{}) + } + kernelPageSize int64 reportedStatFile map[string]string lastNetSample map[string]ioSample lastDiskIOSample map[string]ioSample lastCPUSample cpuSample lastDiskSpaceSample diskSpaceSample + reportPIDs map[string]int + reportPIDsMu sync.Mutex + done chan struct{} // closed when we should stop reporting flushed chan struct{} // closed when we have made our last report } @@ -86,6 +84,17 @@ func (r *Reporter) Start() { go r.run() } +// ReportPID starts reporting stats for a specified process. +func (r *Reporter) ReportPID(name string, pid int) { + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + if r.reportPIDs == nil { + r.reportPIDs = map[string]int{name: pid} + } else { + r.reportPIDs[name] = pid + } +} + // Stop reporting. Do not call more than once, or before calling // Start. // @@ -256,11 +265,81 @@ func (r *Reporter) doMemoryStats() { } var outstat bytes.Buffer for _, key := range wantStats { - if val, ok := thisSample.memStat[key]; ok { - outstat.WriteString(fmt.Sprintf(" %d %s", val, key)) + // Use "total_X" stats (entire hierarchy) if enabled, + // otherwise just the single cgroup -- see + // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + if val, ok := thisSample.memStat["total_"+key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) + } else if val, ok := thisSample.memStat[key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) } } r.Logger.Printf("mem%s\n", outstat.String()) + + if r.kernelPageSize == 0 { + // assign "don't try again" value in case we give up + // and return without assigning the real value + r.kernelPageSize = -1 + buf, err := os.ReadFile("/proc/self/smaps") + if err != nil { + r.Logger.Printf("error reading /proc/self/smaps: %s", err) + return + } + m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf) + if len(m) != 2 { + r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found") + return + } + size, err := strconv.ParseInt(string(m[1]), 10, 64) + if err != nil { + r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err) + return + } + r.kernelPageSize = size * 1024 + } else if r.kernelPageSize < 0 { + // already failed to determine page size, don't keep + // trying/logging + return + } + + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + procnames := make([]string, 0, len(r.reportPIDs)) + for name := range r.reportPIDs { + procnames = append(procnames, name) + } + sort.Strings(procnames) + procmem := "" + for _, procname := range procnames { + pid := r.reportPIDs[procname] + buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)) + if err != nil { + continue + } + // If the executable name contains a ')' char, + // /proc/$pid/stat will look like '1234 (exec name)) S + // 123 ...' -- the last ')' is the end of the 2nd + // field. + paren := bytes.LastIndexByte(buf, ')') + if paren < 0 { + continue + } + fields := bytes.SplitN(buf[paren:], []byte{' '}, 24) + if len(fields) < 24 { + continue + } + // rss is the 24th field in .../stat, and fields[0] + // here is the last char ')' of the 2nd field, so + // rss is fields[22] + rss, err := strconv.ParseInt(string(fields[22]), 10, 64) + if err != nil { + continue + } + procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname) + } + if procmem != "" { + r.Logger.Printf("procmem%s\n", procmem) + } } func (r *Reporter) doNetworkStats() { @@ -390,7 +469,7 @@ func (r *Reporter) doCPUStats() { var userTicks, sysTicks int64 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks) - userHz := float64(C.sysconf(C._SC_CLK_TCK)) + userHz := float64(100) nextSample := cpuSample{ hasData: true, sampleTime: time.Now(),