Merge branch '22132-scheduling-status-messages' refs #22132
[arvados.git] / lib / crunchstat / crunchstat_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchstat
6
7 import (
8         "bytes"
9         "fmt"
10         "io/fs"
11         "os"
12         "regexp"
13         "runtime"
14         "strconv"
15         "testing"
16         "time"
17
18         "github.com/sirupsen/logrus"
19         . "gopkg.in/check.v1"
20 )
21
22 const logMsgPrefix = `(?m)(.*\n)*.* msg="`
23
24 func Test(t *testing.T) {
25         TestingT(t)
26 }
27
28 var _ = Suite(&suite{})
29
30 type testdatasource struct {
31         fspath string
32         pid    int
33 }
34
35 func (s testdatasource) Pid() int {
36         return s.pid
37 }
38 func (s testdatasource) FS() fs.FS {
39         return os.DirFS(s.fspath)
40 }
41
42 // To generate a test case for a new OS target, build
43 // cmd/arvados-server and run
44 //
45 //      arvados-server crunchstat -dump ./testdata/example1234 sleep 2
46 var testdata = map[string]testdatasource{
47         "debian10":   {fspath: "testdata/debian10", pid: 3288},
48         "debian11":   {fspath: "testdata/debian11", pid: 4153022},
49         "debian12":   {fspath: "testdata/debian12", pid: 1115883},
50         "ubuntu1804": {fspath: "testdata/ubuntu1804", pid: 2523},
51         "ubuntu2004": {fspath: "testdata/ubuntu2004", pid: 1360},
52         "ubuntu2204": {fspath: "testdata/ubuntu2204", pid: 1967},
53 }
54
55 type suite struct {
56         logbuf                bytes.Buffer
57         logger                *logrus.Logger
58         debian12MemoryCurrent int64
59 }
60
61 func (s *suite) SetUpSuite(c *C) {
62         s.logger = logrus.New()
63         s.logger.Out = &s.logbuf
64
65         buf, err := os.ReadFile("testdata/debian12/sys/fs/cgroup/user.slice/user-1000.slice/session-4.scope/memory.current")
66         c.Assert(err, IsNil)
67         _, err = fmt.Sscanf(string(buf), "%d", &s.debian12MemoryCurrent)
68         c.Assert(err, IsNil)
69 }
70
71 func (s *suite) SetUpTest(c *C) {
72         s.logbuf.Reset()
73 }
74
75 // Report stats for the current (go test) process's cgroup, using the
76 // test host's real procfs/sysfs.
77 func (s *suite) TestReportCurrent(c *C) {
78         r := Reporter{
79                 Pid:        os.Getpid,
80                 Logger:     s.logger,
81                 PollPeriod: time.Second,
82         }
83         r.Start()
84         defer r.Stop()
85         checkPatterns := []string{
86                 `(?ms).*rss.*`,
87                 `(?ms).*net:.*`,
88                 `(?ms).*blkio:.*`,
89                 `(?ms).* [\d.]+ user [\d.]+ sys ` + fmt.Sprintf("%.2f", float64(runtime.NumCPU())) + ` cpus -- .*`,
90         }
91         for deadline := time.Now().Add(4 * time.Second); !c.Failed(); time.Sleep(time.Millisecond) {
92                 done := true
93                 for _, pattern := range checkPatterns {
94                         if m := regexp.MustCompile(pattern).FindSubmatch(s.logbuf.Bytes()); len(m) == 0 {
95                                 done = false
96                                 if time.Now().After(deadline) {
97                                         c.Errorf("timed out waiting for %s", pattern)
98                                 }
99                         }
100                 }
101                 if done {
102                         break
103                 }
104         }
105         c.Logf("%s", s.logbuf.String())
106 }
107
108 // Report stats for a the current (go test) process.
109 func (s *suite) TestReportPIDs(c *C) {
110         r := Reporter{
111                 Pid:        func() int { return 1 },
112                 Logger:     s.logger,
113                 PollPeriod: time.Second,
114         }
115         r.Start()
116         defer r.Stop()
117         r.ReportPID("init", 1)
118         r.ReportPID("test_process", os.Getpid())
119         r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
120         for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
121                 if time.Now().After(deadline) {
122                         c.Error("timed out")
123                         break
124                 }
125                 if m := regexp.MustCompile(`(?ms).*procmem \d+ init (\d+) test_process.*`).FindSubmatch(s.logbuf.Bytes()); len(m) > 0 {
126                         size, err := strconv.ParseInt(string(m[1]), 10, 64)
127                         c.Check(err, IsNil)
128                         // Expect >1 MiB and <100 MiB -- otherwise we
129                         // are probably misinterpreting /proc/N/stat
130                         // or multiplying by the wrong page size.
131                         c.Check(size > 1000000, Equals, true)
132                         c.Check(size < 100000000, Equals, true)
133                         break
134                 }
135         }
136         c.Logf("%s", s.logbuf.String())
137 }
138
139 func (s *suite) TestAllTestdata(c *C) {
140         for platform, datasource := range testdata {
141                 s.logbuf.Reset()
142                 c.Logf("=== %s", platform)
143                 rep := Reporter{
144                         Pid:             datasource.Pid,
145                         FS:              datasource.FS(),
146                         Logger:          s.logger,
147                         PollPeriod:      time.Second,
148                         ThresholdLogger: s.logger,
149                         Debug:           true,
150                 }
151                 rep.Start()
152                 rep.Stop()
153                 logs := s.logbuf.String()
154                 c.Logf("%s", logs)
155                 c.Check(logs, Matches, `(?ms).* \d\d+ rss\\n.*`)
156                 c.Check(logs, Matches, `(?ms).*blkio:\d+:\d+ \d+ write \d+ read\\n.*`)
157                 c.Check(logs, Matches, `(?ms).*net:\S+ \d+ tx \d+ rx\\n.*`)
158                 c.Check(logs, Matches, `(?ms).* [\d.]+ user [\d.]+ sys [2-9]\d*\.\d\d cpus.*`)
159         }
160 }
161
162 func (s *suite) testRSSThresholds(c *C, rssPercentages []int64, alertCount int) {
163         c.Assert(alertCount <= len(rssPercentages), Equals, true)
164         rep := Reporter{
165                 Pid:    testdata["debian12"].Pid,
166                 FS:     testdata["debian12"].FS(),
167                 Logger: s.logger,
168                 MemThresholds: map[string][]Threshold{
169                         "rss": NewThresholdsFromPercentages(s.debian12MemoryCurrent*3/2, rssPercentages),
170                 },
171                 PollPeriod:      time.Second * 10,
172                 ThresholdLogger: s.logger,
173         }
174         rep.Start()
175         rep.Stop()
176         logs := s.logbuf.String()
177         c.Logf("%s", logs)
178
179         for index, expectPercentage := range rssPercentages[:alertCount] {
180                 var logCheck Checker
181                 if index < alertCount {
182                         logCheck = Matches
183                 } else {
184                         logCheck = Not(Matches)
185                 }
186                 pattern := fmt.Sprintf(`%sContainer using over %d%% of memory \(rss %d/%d bytes\)"`,
187                         logMsgPrefix, expectPercentage, s.debian12MemoryCurrent, s.debian12MemoryCurrent*3/2)
188                 c.Check(logs, logCheck, pattern)
189         }
190 }
191
192 func (s *suite) TestZeroRSSThresholds(c *C) {
193         s.testRSSThresholds(c, []int64{}, 0)
194 }
195
196 func (s *suite) TestOneRSSThresholdPassed(c *C) {
197         s.testRSSThresholds(c, []int64{55}, 1)
198 }
199
200 func (s *suite) TestOneRSSThresholdNotPassed(c *C) {
201         s.testRSSThresholds(c, []int64{85}, 0)
202 }
203
204 func (s *suite) TestMultipleRSSThresholdsNonePassed(c *C) {
205         s.testRSSThresholds(c, []int64{95, 97, 99}, 0)
206 }
207
208 func (s *suite) TestMultipleRSSThresholdsSomePassed(c *C) {
209         s.testRSSThresholds(c, []int64{45, 60, 75, 90}, 2)
210 }
211
212 func (s *suite) TestMultipleRSSThresholdsAllPassed(c *C) {
213         s.testRSSThresholds(c, []int64{1, 2, 3}, 3)
214 }
215
216 func (s *suite) TestLogMaxima(c *C) {
217         rep := Reporter{
218                 Pid:        testdata["debian12"].Pid,
219                 FS:         testdata["debian12"].FS(),
220                 Logger:     s.logger,
221                 PollPeriod: time.Second * 10,
222                 TempDir:    "/",
223         }
224         rep.Start()
225         rep.Stop()
226         rep.LogMaxima(s.logger, map[string]int64{"rss": s.debian12MemoryCurrent * 3 / 2})
227         logs := s.logbuf.String()
228         c.Logf("%s", logs)
229
230         expectRSS := fmt.Sprintf(`Maximum container memory rss usage was %d%%, %d/%d bytes`,
231                 66, s.debian12MemoryCurrent, s.debian12MemoryCurrent*3/2)
232         for _, expected := range []string{
233                 `Maximum disk usage was \d+%, \d+/\d+ bytes`,
234                 `Maximum container memory swap usage was \d\d+ bytes`,
235                 `Maximum container memory pgmajfault usage was \d\d+ faults`,
236                 expectRSS,
237         } {
238                 pattern := logMsgPrefix + expected + `"`
239                 c.Check(logs, Matches, pattern)
240         }
241 }
242
243 func (s *suite) TestLogProcessMemMax(c *C) {
244         rep := Reporter{
245                 Pid:        os.Getpid,
246                 Logger:     s.logger,
247                 PollPeriod: time.Second * 10,
248         }
249         rep.ReportPID("test-run", os.Getpid())
250         rep.Start()
251         rep.Stop()
252         rep.LogProcessMemMax(s.logger)
253         logs := s.logbuf.String()
254         c.Logf("%s", logs)
255
256         pattern := logMsgPrefix + `Maximum test-run memory rss usage was \d+ bytes"`
257         c.Check(logs, Matches, pattern)
258 }