19563: Log memory usage of arv-mount, crunch-run, and keepstore.
authorTom Clegg <tom@curii.com>
Mon, 24 Oct 2022 20:45:45 +0000 (16:45 -0400)
committerTom Clegg <tom@curii.com>
Mon, 24 Oct 2022 20:45:45 +0000 (16:45 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchstat/crunchstat.go
lib/crunchstat/crunchstat_test.go
services/crunchstat/crunchstat.go

index ee9115d8d809903be17cbaa10dc4010d1b7d87dc..55790f727a61d289d5b0d5080fa2911ee7789515 100644 (file)
@@ -142,6 +142,7 @@ type ContainerRunner struct {
        parentTemp    string
        costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -660,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -733,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -1569,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1853,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
index 10cd7cfce43a03472e2e942b68512efcdd7d0c61..443d2202cecb5420ad174ae56ca41009cd7bea1b 100644 (file)
@@ -13,10 +13,11 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
+       "sort"
        "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
@@ -47,7 +48,9 @@ type Reporter struct {
        TempDir string
 
        // Where to write statistics. Must not be nil.
-       Logger *log.Logger
+       Logger interface {
+               Printf(fmt string, args ...interface{})
+       }
 
        reportedStatFile    map[string]string
        lastNetSample       map[string]ioSample
@@ -55,6 +58,9 @@ type Reporter struct {
        lastCPUSample       cpuSample
        lastDiskSpaceSample diskSpaceSample
 
+       reportPIDs   map[string]int
+       reportPIDsMu sync.Mutex
+
        done    chan struct{} // closed when we should stop reporting
        flushed chan struct{} // closed when we have made our last report
 }
@@ -76,6 +82,17 @@ func (r *Reporter) Start() {
        go r.run()
 }
 
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       if r.reportPIDs == nil {
+               r.reportPIDs = map[string]int{name: pid}
+       } else {
+               r.reportPIDs[name] = pid
+       }
+}
+
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
@@ -256,6 +273,45 @@ func (r *Reporter) doMemoryStats() {
                }
        }
        r.Logger.Printf("mem%s\n", outstat.String())
+
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       procnames := make([]string, 0, len(r.reportPIDs))
+       for name := range r.reportPIDs {
+               procnames = append(procnames, name)
+       }
+       sort.Strings(procnames)
+       procmem := ""
+       for _, procname := range procnames {
+               pid := r.reportPIDs[procname]
+               buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+               if err != nil {
+                       continue
+               }
+               // If the executable name contains a ')' char,
+               // /proc/$pid/stat will look like '1234 (exec name)) S
+               // 123 ...' -- the last ')' is the end of the 2nd
+               // field.
+               paren := bytes.LastIndexByte(buf, ')')
+               if paren < 0 {
+                       continue
+               }
+               fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+               if len(fields) < 24 {
+                       continue
+               }
+               // rss is the 24th field in .../stat, and fields[0]
+               // here is the last char ')' of the 2nd field, so
+               // rss is fields[22]
+               rss, err := strconv.Atoi(string(fields[22]))
+               if err != nil {
+                       continue
+               }
+               procmem += fmt.Sprintf(" %d %s", rss, procname)
+       }
+       if procmem != "" {
+               r.Logger.Printf("procmem%s\n", procmem)
+       }
 }
 
 func (r *Reporter) doNetworkStats() {
index c27e39241df08af2c925a791e6fd849afc496b90..f5e2f8662fb21c67e0d2883b58e9cf18beb52463 100644 (file)
@@ -6,20 +6,33 @@ package crunchstat
 
 import (
        "bufio"
+       "bytes"
        "io"
        "log"
        "os"
        "regexp"
        "testing"
+       "time"
+
+       "github.com/sirupsen/logrus"
+       . "gopkg.in/check.v1"
 )
 
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+var _ = Suite(&suite{})
+
+type suite struct{}
+
 func bufLogger() (*log.Logger, *bufio.Reader) {
        r, w := io.Pipe()
        logger := log.New(w, "", 0)
        return logger, bufio.NewReader(r)
 }
 
-func TestReadAllOrWarnFail(t *testing.T) {
+func (s *suite) TestReadAllOrWarnFail(c *C) {
        logger, rcv := bufLogger()
        rep := Reporter{Logger: logger}
 
@@ -35,32 +48,57 @@ func TestReadAllOrWarnFail(t *testing.T) {
                // reading, but reading from byte 0 returns an error.
                f, err := os.Open("/proc/self/mem")
                if err != nil {
-                       t.Fatalf("Opening /proc/self/mem: %s", err)
+                       c.Fatalf("Opening /proc/self/mem: %s", err)
                }
                if x, err := rep.readAllOrWarn(f); err == nil {
-                       t.Fatalf("Expected error, got %v", x)
+                       c.Fatalf("Expected error, got %v", x)
                }
        }
        <-done
        if err != nil {
-               t.Fatal(err)
+               c.Fatal(err)
        } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
-               t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+               c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
        }
 }
 
-func TestReadAllOrWarnSuccess(t *testing.T) {
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
        rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
 
        f, err := os.Open("./crunchstat_test.go")
        if err != nil {
-               t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+               c.Fatalf("Opening ./crunchstat_test.go: %s", err)
        }
        data, err := rep.readAllOrWarn(f)
        if err != nil {
-               t.Fatalf("got error %s", err)
+               c.Fatalf("got error %s", err)
        }
        if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
-               t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+               c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+       }
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+       var logbuf bytes.Buffer
+       logger := logrus.New()
+       logger.Out = &logbuf
+       r := Reporter{
+               Logger:     logger,
+               CgroupRoot: "/sys/fs/cgroup",
+               PollPeriod: time.Second,
+       }
+       r.Start()
+       r.ReportPID("init", 1)
+       r.ReportPID("test_process", os.Getpid())
+       r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+       for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+               if time.Now().After(deadline) {
+                       c.Error("timed out")
+                       break
+               }
+               if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+                       break
+               }
        }
+       c.Logf("%s", logbuf.String())
 }
index 6383eae5452dd1d145420e7da41ce773878b5cef..d28bee0f5e19591275eab2ae43d2a640d316de6d 100644 (file)
@@ -28,6 +28,10 @@ var (
        version               = "dev"
 )
 
+type logger interface {
+       Printf(string, ...interface{})
+}
+
 func main() {
        reporter := crunchstat.Reporter{
                Logger: log.New(os.Stderr, "crunchstat: ", 0),
@@ -55,9 +59,11 @@ func main() {
        reporter.Logger.Printf("crunchstat %s started", version)
 
        if reporter.CgroupRoot == "" {
-               reporter.Logger.Fatal("error: must provide -cgroup-root")
+               reporter.Logger.Printf("error: must provide -cgroup-root")
+               os.Exit(2)
        } else if signalOnDeadPPID < 0 {
-               reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+               reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+               os.Exit(2)
        }
        reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
 
@@ -76,17 +82,19 @@ func main() {
                if status, ok := err.Sys().(syscall.WaitStatus); ok {
                        os.Exit(status.ExitStatus())
                } else {
-                       reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
+                       reporter.Logger.Printf("ExitError without WaitStatus: %v", err)
+                       os.Exit(1)
                }
        } else if err != nil {
-               reporter.Logger.Fatalln("error in cmd.Wait:", err)
+               reporter.Logger.Printf("error running command: %v", err)
+               os.Exit(1)
        }
 }
 
-func runCommand(argv []string, logger *log.Logger) error {
+func runCommand(argv []string, logger logger) error {
        cmd := exec.Command(argv[0], argv[1:]...)
 
-       logger.Println("Running", argv)
+       logger.Printf("Running %v", argv)
 
        // Child process will use our stdin and stdout pipes
        // (we close our copies below)
@@ -100,7 +108,7 @@ func runCommand(argv []string, logger *log.Logger) error {
                if cmd.Process != nil {
                        cmd.Process.Signal(catch)
                }
-               logger.Println("notice: caught signal:", catch)
+               logger.Printf("notice: caught signal: %v", catch)
        }(sigChan)
        signal.Notify(sigChan, syscall.SIGTERM)
        signal.Notify(sigChan, syscall.SIGINT)
@@ -113,24 +121,30 @@ func runCommand(argv []string, logger *log.Logger) error {
        // Funnel stderr through our channel
        stderrPipe, err := cmd.StderrPipe()
        if err != nil {
-               logger.Fatalln("error in StderrPipe:", err)
+               logger.Printf("error in StderrPipe: %v", err)
+               return err
        }
 
        // Run subprocess
        if err := cmd.Start(); err != nil {
-               logger.Fatalln("error in cmd.Start:", err)
+               logger.Printf("error in cmd.Start: %v", err)
+               return err
        }
 
        // Close stdin/stdout in this (parent) process
        os.Stdin.Close()
        os.Stdout.Close()
 
-       copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+       err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+       if err != nil {
+               cmd.Process.Kill()
+               return err
+       }
 
        return cmd.Wait()
 }
 
-func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
+func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) {
        ticker := time.NewTicker(intvl)
        for range ticker.C {
                ppid := os.Getppid()
@@ -152,7 +166,7 @@ func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.C
        }
 }
 
-func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+func copyPipeToChildLog(in io.ReadCloser, logger logger) error {
        reader := bufio.NewReaderSize(in, MaxLogLine)
        var prefix string
        for {
@@ -160,13 +174,13 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
                if err == io.EOF {
                        break
                } else if err != nil {
-                       logger.Fatal("error reading child stderr:", err)
+                       return fmt.Errorf("error reading child stderr: %w", err)
                }
                var suffix string
                if isPrefix {
                        suffix = "[...]"
                }
-               logger.Print(prefix, string(line), suffix)
+               logger.Printf("%s%s%s", prefix, string(line), suffix)
                // Set up prefix for following line
                if isPrefix {
                        prefix = "[...]"
@@ -174,5 +188,5 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
                        prefix = ""
                }
        }
-       in.Close()
+       return in.Close()
 }