X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1ebc07e853ac6fb44b5a8966d7381e771dd68898..4d3b8b299deaa4fff45102a26768e26129b17f10:/lib/crunchstat/crunchstat.go diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index 3ba3ce6b4b..028083fa0d 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -1,3 +1,9 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +// Package crunchstat reports resource usage (CPU, memory, disk, +// network) for a cgroup. package crunchstat import ( @@ -11,10 +17,11 @@ import ( "os" "strconv" "strings" + "syscall" "time" ) -// This magically allows us to look up user_hz via _SC_CLK_TCK: +// This magically allows us to look up userHz via _SC_CLK_TCK: /* #include @@ -28,52 +35,70 @@ import "C" // log.Logger. type Reporter struct { // CID of the container to monitor. If empty, read the CID - // from CIDFile. + // from CIDFile (first waiting until a non-empty file appears + // at CIDFile). If CIDFile is also empty, report host + // statistics. CID string - // Where cgroup special files live on this system - CgroupRoot string - CgroupParent string - // Path to a file we can read CID from. If CIDFile is empty or - // nonexistent, wait for it to appear. + + // Path to a file we can read CID from. CIDFile string - // Interval between samples - Poll time.Duration + // Where cgroup accounting files live on this system, e.g., + // "/sys/fs/cgroup". + CgroupRoot string + + // Parent cgroup, e.g., "docker". + CgroupParent string + + // Interval between samples. Must be positive. + PollPeriod time.Duration + + // Temporary directory, will be monitored for available, used & total space. + TempDir string - // Where to write statistics. + // Where to write statistics. Must not be nil. Logger *log.Logger - reportedStatFile map[string]string - lastNetSample map[string]IoSample - lastDiskSample map[string]IoSample - lastCPUSample CpuSample + reportedStatFile map[string]string + lastNetSample map[string]ioSample + lastDiskIOSample map[string]ioSample + lastCPUSample cpuSample + lastDiskSpaceSample diskSpaceSample - done chan struct{} + done chan struct{} // closed when we should stop reporting + flushed chan struct{} // closed when we have made our last report } -// Wait (if necessary) for the CID to appear in CIDFile, then start -// reporting statistics. +// Start starts monitoring in a new goroutine, and returns +// immediately. // -// Start should not be called more than once on a Reporter. +// The monitoring goroutine waits for a non-empty CIDFile to appear +// (unless CID is non-empty). Then it waits for the accounting files +// to appear for the monitored container. Then it collects and reports +// statistics until Stop is called. // -// Public data fields should not be changed after calling Start. +// Callers should not call Start more than once. +// +// Callers should not modify public data fields after calling Start. func (r *Reporter) Start() { r.done = make(chan struct{}) + r.flushed = make(chan struct{}) go r.run() } -// Stop reporting statistics. Do not call more than once, or before -// calling Start. +// Stop reporting. Do not call more than once, or before calling +// Start. // // Nothing will be logged after Stop returns. func (r *Reporter) Stop() { close(r.done) + <-r.flushed } func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) { content, err := ioutil.ReadAll(in) if err != nil { - r.Logger.Print(err) + r.Logger.Printf("warning: %v", err) } return content, err } @@ -149,7 +174,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) { statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid) stats, err := ioutil.ReadFile(statsFilename) if err != nil { - r.Logger.Print(err) + r.Logger.Printf("notice: %v", err) continue } return strings.NewReader(string(stats)), nil @@ -157,13 +182,13 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) { return nil, errors.New("Could not read stats for any proc in container") } -type IoSample struct { +type ioSample struct { sampleTime time.Time txBytes int64 rxBytes int64 } -func (r *Reporter) DoBlkIoStats() { +func (r *Reporter) doBlkIOStats() { c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true) if err != nil { return @@ -171,17 +196,17 @@ func (r *Reporter) DoBlkIoStats() { defer c.Close() b := bufio.NewScanner(c) var sampleTime = time.Now() - newSamples := make(map[string]IoSample) + newSamples := make(map[string]ioSample) for b.Scan() { var device, op string var val int64 if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil { continue } - var thisSample IoSample + var thisSample ioSample var ok bool if thisSample, ok = newSamples[device]; !ok { - thisSample = IoSample{sampleTime, -1, -1} + thisSample = ioSample{sampleTime, -1, -1} } switch op { case "Read": @@ -196,30 +221,30 @@ func (r *Reporter) DoBlkIoStats() { continue } delta := "" - if prev, ok := r.lastDiskSample[dev]; ok { + if prev, ok := r.lastDiskIOSample[dev]; ok { delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read", sample.sampleTime.Sub(prev.sampleTime).Seconds(), sample.txBytes-prev.txBytes, sample.rxBytes-prev.rxBytes) } r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta) - r.lastDiskSample[dev] = sample + r.lastDiskIOSample[dev] = sample } } -type MemSample struct { +type memSample struct { sampleTime time.Time memStat map[string]int64 } -func (r *Reporter) DoMemoryStats() { +func (r *Reporter) doMemoryStats() { c, err := r.openStatFile("memory", "memory.stat", true) if err != nil { return } defer c.Close() b := bufio.NewScanner(c) - thisSample := MemSample{time.Now(), make(map[string]int64)} + thisSample := memSample{time.Now(), make(map[string]int64)} wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"} for b.Scan() { var stat string @@ -231,14 +256,19 @@ func (r *Reporter) DoMemoryStats() { } var outstat bytes.Buffer for _, key := range wantStats { - if val, ok := thisSample.memStat[key]; ok { - outstat.WriteString(fmt.Sprintf(" %d %s", val, key)) + // Use "total_X" stats (entire hierarchy) if enabled, + // otherwise just the single cgroup -- see + // https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + if val, ok := thisSample.memStat["total_"+key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) + } else if val, ok := thisSample.memStat[key]; ok { + fmt.Fprintf(&outstat, " %d %s", val, key) } } r.Logger.Printf("mem%s\n", outstat.String()) } -func (r *Reporter) DoNetworkStats() { +func (r *Reporter) doNetworkStats() { sampleTime := time.Now() stats, err := r.getContainerNetStats() if err != nil { @@ -265,7 +295,7 @@ func (r *Reporter) DoNetworkStats() { if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil { continue } - nextSample := IoSample{} + nextSample := ioSample{} nextSample.sampleTime = sampleTime nextSample.txBytes = tx nextSample.rxBytes = rx @@ -282,7 +312,43 @@ func (r *Reporter) DoNetworkStats() { } } -type CpuSample struct { +type diskSpaceSample struct { + hasData bool + sampleTime time.Time + total uint64 + used uint64 + available uint64 +} + +func (r *Reporter) doDiskSpaceStats() { + s := syscall.Statfs_t{} + err := syscall.Statfs(r.TempDir, &s) + if err != nil { + return + } + bs := uint64(s.Bsize) + nextSample := diskSpaceSample{ + hasData: true, + sampleTime: time.Now(), + total: s.Blocks * bs, + used: (s.Blocks - s.Bfree) * bs, + available: s.Bavail * bs, + } + + var delta string + if r.lastDiskSpaceSample.hasData { + prev := r.lastDiskSpaceSample + interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds() + delta = fmt.Sprintf(" -- interval %.4f seconds %d used", + interval, + int64(nextSample.used-prev.used)) + } + r.Logger.Printf("statfs %d available %d used %d total%s\n", + nextSample.available, nextSample.used, nextSample.total, delta) + r.lastDiskSpaceSample = nextSample +} + +type cpuSample struct { hasData bool // to distinguish the zero value from real data sampleTime time.Time user float64 @@ -292,13 +358,16 @@ type CpuSample struct { // Return the number of CPUs available in the container. Return 0 if // we can't figure out the real number of CPUs. -func (r *Reporter) GetCpuCount() int64 { +func (r *Reporter) getCPUCount() int64 { cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true) if err != nil { return 0 } defer cpusetFile.Close() b, err := r.readAllOrWarn(cpusetFile) + if err != nil { + return 0 + } sp := strings.Split(string(b), ",") cpus := int64(0) for _, v := range sp { @@ -307,13 +376,13 @@ func (r *Reporter) GetCpuCount() int64 { if n == 2 { cpus += (max - min) + 1 } else { - cpus += 1 + cpus++ } } return cpus } -func (r *Reporter) DoCpuStats() { +func (r *Reporter) doCPUStats() { statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true) if err != nil { return @@ -324,12 +393,16 @@ func (r *Reporter) DoCpuStats() { return } - nextSample := CpuSample{true, time.Now(), 0, 0, r.GetCpuCount()} var userTicks, sysTicks int64 fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks) - user_hz := float64(C.sysconf(C._SC_CLK_TCK)) - nextSample.user = float64(userTicks) / user_hz - nextSample.sys = float64(sysTicks) / user_hz + userHz := float64(C.sysconf(C._SC_CLK_TCK)) + nextSample := cpuSample{ + hasData: true, + sampleTime: time.Now(), + user: float64(userTicks) / userHz, + sys: float64(sysTicks) / userHz, + cpus: r.getCPUCount(), + } delta := "" if r.lastCPUSample.hasData { @@ -343,24 +416,35 @@ func (r *Reporter) DoCpuStats() { r.lastCPUSample = nextSample } -// Report stats periodically until r.done indicates someone called -// Stop. +// Report stats periodically until we learn (via r.done) that someone +// called Stop. func (r *Reporter) run() { + defer close(r.flushed) + r.reportedStatFile = make(map[string]string) if !r.waitForCIDFile() || !r.waitForCgroup() { return } - r.lastNetSample = make(map[string]IoSample) - r.lastDiskSample = make(map[string]IoSample) + r.lastNetSample = make(map[string]ioSample) + r.lastDiskIOSample = make(map[string]ioSample) - ticker := time.NewTicker(r.Poll) + if len(r.TempDir) == 0 { + // Temporary dir not provided, try to get it from the environment. + r.TempDir = os.Getenv("TMPDIR") + } + if len(r.TempDir) > 0 { + r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir) + } + + ticker := time.NewTicker(r.PollPeriod) for { - r.DoMemoryStats() - r.DoCpuStats() - r.DoBlkIoStats() - r.DoNetworkStats() + r.doMemoryStats() + r.doCPUStats() + r.doBlkIOStats() + r.doNetworkStats() + r.doDiskSpaceStats() select { case <-r.done: return @@ -370,9 +454,9 @@ func (r *Reporter) run() { } // If CID is empty, wait for it to appear in CIDFile. Return true if -// we get it before r.done indicates someone called Stop. +// we get it before we learn (via r.done) that someone called Stop. func (r *Reporter) waitForCIDFile() bool { - if r.CID != "" { + if r.CID != "" || r.CIDFile == "" { return true } @@ -387,7 +471,7 @@ func (r *Reporter) waitForCIDFile() bool { select { case <-ticker.C: case <-r.done: - r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err) + r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err) return false } } @@ -400,7 +484,7 @@ func (r *Reporter) waitForCIDFile() bool { func (r *Reporter) waitForCgroup() bool { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - warningTimer := time.After(r.Poll) + warningTimer := time.After(r.PollPeriod) for { c, err := r.openStatFile("cpuacct", "cgroup.procs", false) if err == nil { @@ -410,9 +494,9 @@ func (r *Reporter) waitForCgroup() bool { select { case <-ticker.C: case <-warningTimer: - r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.Poll) + r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod) case <-r.done: - r.Logger.Printf("cgroup stats files never appeared for %v", r.CID) + r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID) return false } }