12416: Merge Put() into PutReader().
[arvados.git] / lib / crunchstat / crunchstat.go
index 03cfa7d3ef2570826ff4b1279783711d1e00597d..056ef0d185e61c7bbc52b692abd21ea61d9afdd4 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 // Package crunchstat reports resource usage (CPU, memory, disk,
 // network) for a cgroup.
 package crunchstat
@@ -56,7 +60,8 @@ type Reporter struct {
        lastDiskSample   map[string]ioSample
        lastCPUSample    cpuSample
 
-       done chan struct{}
+       done    chan struct{} // closed when we should stop reporting
+       flushed chan struct{} // closed when we have made our last report
 }
 
 // Start starts monitoring in a new goroutine, and returns
@@ -72,6 +77,7 @@ type Reporter struct {
 // Callers should not modify public data fields after calling Start.
 func (r *Reporter) Start() {
        r.done = make(chan struct{})
+       r.flushed = make(chan struct{})
        go r.run()
 }
 
@@ -81,12 +87,13 @@ func (r *Reporter) Start() {
 // Nothing will be logged after Stop returns.
 func (r *Reporter) Stop() {
        close(r.done)
+       <-r.flushed
 }
 
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
-               r.Logger.Print(err)
+               r.Logger.Printf("warning: %v", err)
        }
        return content, err
 }
@@ -162,7 +169,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) {
                statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
                stats, err := ioutil.ReadFile(statsFilename)
                if err != nil {
-                       r.Logger.Print(err)
+                       r.Logger.Printf("notice: %v", err)
                        continue
                }
                return strings.NewReader(string(stats)), nil
@@ -366,6 +373,8 @@ func (r *Reporter) doCPUStats() {
 // Report stats periodically until we learn (via r.done) that someone
 // called Stop.
 func (r *Reporter) run() {
+       defer close(r.flushed)
+
        r.reportedStatFile = make(map[string]string)
 
        if !r.waitForCIDFile() || !r.waitForCgroup() {
@@ -407,7 +416,7 @@ func (r *Reporter) waitForCIDFile() bool {
                select {
                case <-ticker.C:
                case <-r.done:
-                       r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+                       r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
                        return false
                }
        }
@@ -430,9 +439,9 @@ func (r *Reporter) waitForCgroup() bool {
                select {
                case <-ticker.C:
                case <-warningTimer:
-                       r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+                       r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
                case <-r.done:
-                       r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+                       r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
                        return false
                }
        }