From: Tom Clegg Date: Mon, 24 Oct 2022 20:45:45 +0000 (-0400) Subject: 19563: Log memory usage of arv-mount, crunch-run, and keepstore. X-Git-Tag: 2.5.0~51^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/ca69f0bebc31124d9b61cec4b790d45a94bff379 19563: Log memory usage of arv-mount, crunch-run, and keepstore. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index ee9115d8d8..55790f727a 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -142,6 +142,7 @@ type ContainerRunner struct { parentTemp string costStartTime time.Time + keepstore *exec.Cmd keepstoreLogger io.WriteCloser keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser @@ -660,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -733,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error { PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -1569,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1853,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore if keepstore == nil { // Log explanation (if any) for why we're not running // a local keepstore. diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go index 10cd7cfce4..443d2202ce 100644 --- a/lib/crunchstat/crunchstat.go +++ b/lib/crunchstat/crunchstat.go @@ -13,10 +13,11 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" + "sort" "strconv" "strings" + "sync" "syscall" "time" ) @@ -47,7 +48,9 @@ type Reporter struct { TempDir string // Where to write statistics. Must not be nil. - Logger *log.Logger + Logger interface { + Printf(fmt string, args ...interface{}) + } reportedStatFile map[string]string lastNetSample map[string]ioSample @@ -55,6 +58,9 @@ type Reporter struct { lastCPUSample cpuSample lastDiskSpaceSample diskSpaceSample + reportPIDs map[string]int + reportPIDsMu sync.Mutex + done chan struct{} // closed when we should stop reporting flushed chan struct{} // closed when we have made our last report } @@ -76,6 +82,17 @@ func (r *Reporter) Start() { go r.run() } +// ReportPID starts reporting stats for a specified process. +func (r *Reporter) ReportPID(name string, pid int) { + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + if r.reportPIDs == nil { + r.reportPIDs = map[string]int{name: pid} + } else { + r.reportPIDs[name] = pid + } +} + // Stop reporting. Do not call more than once, or before calling // Start. // @@ -256,6 +273,45 @@ func (r *Reporter) doMemoryStats() { } } r.Logger.Printf("mem%s\n", outstat.String()) + + r.reportPIDsMu.Lock() + defer r.reportPIDsMu.Unlock() + procnames := make([]string, 0, len(r.reportPIDs)) + for name := range r.reportPIDs { + procnames = append(procnames, name) + } + sort.Strings(procnames) + procmem := "" + for _, procname := range procnames { + pid := r.reportPIDs[procname] + buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)) + if err != nil { + continue + } + // If the executable name contains a ')' char, + // /proc/$pid/stat will look like '1234 (exec name)) S + // 123 ...' -- the last ')' is the end of the 2nd + // field. + paren := bytes.LastIndexByte(buf, ')') + if paren < 0 { + continue + } + fields := bytes.SplitN(buf[paren:], []byte{' '}, 24) + if len(fields) < 24 { + continue + } + // rss is the 24th field in .../stat, and fields[0] + // here is the last char ')' of the 2nd field, so + // rss is fields[22] + rss, err := strconv.Atoi(string(fields[22])) + if err != nil { + continue + } + procmem += fmt.Sprintf(" %d %s", rss, procname) + } + if procmem != "" { + r.Logger.Printf("procmem%s\n", procmem) + } } func (r *Reporter) doNetworkStats() { diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go index c27e39241d..f5e2f8662f 100644 --- a/lib/crunchstat/crunchstat_test.go +++ b/lib/crunchstat/crunchstat_test.go @@ -6,20 +6,33 @@ package crunchstat import ( "bufio" + "bytes" "io" "log" "os" "regexp" "testing" + "time" + + "github.com/sirupsen/logrus" + . "gopkg.in/check.v1" ) +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&suite{}) + +type suite struct{} + func bufLogger() (*log.Logger, *bufio.Reader) { r, w := io.Pipe() logger := log.New(w, "", 0) return logger, bufio.NewReader(r) } -func TestReadAllOrWarnFail(t *testing.T) { +func (s *suite) TestReadAllOrWarnFail(c *C) { logger, rcv := bufLogger() rep := Reporter{Logger: logger} @@ -35,32 +48,57 @@ func TestReadAllOrWarnFail(t *testing.T) { // reading, but reading from byte 0 returns an error. f, err := os.Open("/proc/self/mem") if err != nil { - t.Fatalf("Opening /proc/self/mem: %s", err) + c.Fatalf("Opening /proc/self/mem: %s", err) } if x, err := rep.readAllOrWarn(f); err == nil { - t.Fatalf("Expected error, got %v", x) + c.Fatalf("Expected error, got %v", x) } } <-done if err != nil { - t.Fatal(err) + c.Fatal(err) } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched { - t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg) + c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg) } } -func TestReadAllOrWarnSuccess(t *testing.T) { +func (s *suite) TestReadAllOrWarnSuccess(c *C) { rep := Reporter{Logger: log.New(os.Stderr, "", 0)} f, err := os.Open("./crunchstat_test.go") if err != nil { - t.Fatalf("Opening ./crunchstat_test.go: %s", err) + c.Fatalf("Opening ./crunchstat_test.go: %s", err) } data, err := rep.readAllOrWarn(f) if err != nil { - t.Fatalf("got error %s", err) + c.Fatalf("got error %s", err) } if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched { - t.Fatalf("data failed regexp: err %v, matched %v", err, matched) + c.Fatalf("data failed regexp: err %v, matched %v", err, matched) + } +} + +func (s *suite) TestReportPIDs(c *C) { + var logbuf bytes.Buffer + logger := logrus.New() + logger.Out = &logbuf + r := Reporter{ + Logger: logger, + CgroupRoot: "/sys/fs/cgroup", + PollPeriod: time.Second, + } + r.Start() + r.ReportPID("init", 1) + r.ReportPID("test_process", os.Getpid()) + r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted + for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) { + if time.Now().After(deadline) { + c.Error("timed out") + break + } + if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) { + break + } } + c.Logf("%s", logbuf.String()) } diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go index 6383eae545..d28bee0f5e 100644 --- a/services/crunchstat/crunchstat.go +++ b/services/crunchstat/crunchstat.go @@ -28,6 +28,10 @@ var ( version = "dev" ) +type logger interface { + Printf(string, ...interface{}) +} + func main() { reporter := crunchstat.Reporter{ Logger: log.New(os.Stderr, "crunchstat: ", 0), @@ -55,9 +59,11 @@ func main() { reporter.Logger.Printf("crunchstat %s started", version) if reporter.CgroupRoot == "" { - reporter.Logger.Fatal("error: must provide -cgroup-root") + reporter.Logger.Printf("error: must provide -cgroup-root") + os.Exit(2) } else if signalOnDeadPPID < 0 { - reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID) + reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID) + os.Exit(2) } reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond @@ -76,17 +82,19 @@ func main() { if status, ok := err.Sys().(syscall.WaitStatus); ok { os.Exit(status.ExitStatus()) } else { - reporter.Logger.Fatalln("ExitError without WaitStatus:", err) + reporter.Logger.Printf("ExitError without WaitStatus: %v", err) + os.Exit(1) } } else if err != nil { - reporter.Logger.Fatalln("error in cmd.Wait:", err) + reporter.Logger.Printf("error running command: %v", err) + os.Exit(1) } } -func runCommand(argv []string, logger *log.Logger) error { +func runCommand(argv []string, logger logger) error { cmd := exec.Command(argv[0], argv[1:]...) - logger.Println("Running", argv) + logger.Printf("Running %v", argv) // Child process will use our stdin and stdout pipes // (we close our copies below) @@ -100,7 +108,7 @@ func runCommand(argv []string, logger *log.Logger) error { if cmd.Process != nil { cmd.Process.Signal(catch) } - logger.Println("notice: caught signal:", catch) + logger.Printf("notice: caught signal: %v", catch) }(sigChan) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) @@ -113,24 +121,30 @@ func runCommand(argv []string, logger *log.Logger) error { // Funnel stderr through our channel stderrPipe, err := cmd.StderrPipe() if err != nil { - logger.Fatalln("error in StderrPipe:", err) + logger.Printf("error in StderrPipe: %v", err) + return err } // Run subprocess if err := cmd.Start(); err != nil { - logger.Fatalln("error in cmd.Start:", err) + logger.Printf("error in cmd.Start: %v", err) + return err } // Close stdin/stdout in this (parent) process os.Stdin.Close() os.Stdout.Close() - copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0)) + err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0)) + if err != nil { + cmd.Process.Kill() + return err + } return cmd.Wait() } -func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) { +func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) { ticker := time.NewTicker(intvl) for range ticker.C { ppid := os.Getppid() @@ -152,7 +166,7 @@ func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.C } } -func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) { +func copyPipeToChildLog(in io.ReadCloser, logger logger) error { reader := bufio.NewReaderSize(in, MaxLogLine) var prefix string for { @@ -160,13 +174,13 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) { if err == io.EOF { break } else if err != nil { - logger.Fatal("error reading child stderr:", err) + return fmt.Errorf("error reading child stderr: %w", err) } var suffix string if isPrefix { suffix = "[...]" } - logger.Print(prefix, string(line), suffix) + logger.Printf("%s%s%s", prefix, string(line), suffix) // Set up prefix for following line if isPrefix { prefix = "[...]" @@ -174,5 +188,5 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) { prefix = "" } } - in.Close() + return in.Close() }