18903: Merge branch 'main' into 18903-fix-activity-script
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/config"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "github.com/jmoiron/sqlx"
25         "github.com/prometheus/client_golang/prometheus"
26         "github.com/prometheus/common/expfmt"
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&runSuite{})
31
32 type reqTracker struct {
33         reqs []http.Request
34         sync.Mutex
35 }
36
37 func (rt *reqTracker) Count() int {
38         rt.Lock()
39         defer rt.Unlock()
40         return len(rt.reqs)
41 }
42
43 func (rt *reqTracker) Add(req *http.Request) int {
44         rt.Lock()
45         defer rt.Unlock()
46         rt.reqs = append(rt.reqs, *req)
47         return len(rt.reqs)
48 }
49
50 var stubServices = []arvados.KeepService{
51         {
52                 UUID:           "zzzzz-bi6l4-000000000000000",
53                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
54                 ServicePort:    25107,
55                 ServiceSSLFlag: false,
56                 ServiceType:    "disk",
57         },
58         {
59                 UUID:           "zzzzz-bi6l4-000000000000001",
60                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
61                 ServicePort:    25107,
62                 ServiceSSLFlag: false,
63                 ServiceType:    "disk",
64         },
65         {
66                 UUID:           "zzzzz-bi6l4-000000000000002",
67                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
68                 ServicePort:    25107,
69                 ServiceSSLFlag: false,
70                 ServiceType:    "disk",
71         },
72         {
73                 UUID:           "zzzzz-bi6l4-000000000000003",
74                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
75                 ServicePort:    25107,
76                 ServiceSSLFlag: false,
77                 ServiceType:    "disk",
78         },
79         {
80                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
81                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
82                 ServicePort:    25333,
83                 ServiceSSLFlag: true,
84                 ServiceType:    "proxy",
85         },
86 }
87
88 var stubMounts = map[string][]arvados.KeepMount{
89         "keep0.zzzzz.arvadosapi.com:25107": {{
90                 UUID:           "zzzzz-ivpuk-000000000000000",
91                 DeviceID:       "keep0-vol0",
92                 StorageClasses: map[string]bool{"default": true},
93         }},
94         "keep1.zzzzz.arvadosapi.com:25107": {{
95                 UUID:           "zzzzz-ivpuk-100000000000000",
96                 DeviceID:       "keep1-vol0",
97                 StorageClasses: map[string]bool{"default": true},
98         }},
99         "keep2.zzzzz.arvadosapi.com:25107": {{
100                 UUID:           "zzzzz-ivpuk-200000000000000",
101                 DeviceID:       "keep2-vol0",
102                 StorageClasses: map[string]bool{"default": true},
103         }},
104         "keep3.zzzzz.arvadosapi.com:25107": {{
105                 UUID:           "zzzzz-ivpuk-300000000000000",
106                 DeviceID:       "keep3-vol0",
107                 StorageClasses: map[string]bool{"default": true},
108         }},
109 }
110
111 // stubServer is an HTTP transport that intercepts and processes all
112 // requests using its own handlers.
113 type stubServer struct {
114         mux      *http.ServeMux
115         srv      *httptest.Server
116         mutex    sync.Mutex
117         Requests reqTracker
118         logf     func(string, ...interface{})
119 }
120
121 // Start initializes the stub server and returns an *http.Client that
122 // uses the stub server to handle all requests.
123 //
124 // A stubServer that has been started should eventually be shut down
125 // with Close().
126 func (s *stubServer) Start() *http.Client {
127         // Set up a config.Client that forwards all requests to s.mux
128         // via s.srv. Test cases will attach handlers to s.mux to get
129         // the desired responses.
130         s.mux = http.NewServeMux()
131         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
132                 s.mutex.Lock()
133                 s.Requests.Add(r)
134                 s.mutex.Unlock()
135                 w.Header().Set("Content-Type", "application/json")
136                 s.mux.ServeHTTP(w, r)
137         }))
138         return &http.Client{Transport: s}
139 }
140
141 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
142         w := httptest.NewRecorder()
143         s.mux.ServeHTTP(w, req)
144         return &http.Response{
145                 StatusCode: w.Code,
146                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
147                 Header:     w.HeaderMap,
148                 Body:       ioutil.NopCloser(w.Body)}, nil
149 }
150
151 // Close releases resources used by the server.
152 func (s *stubServer) Close() {
153         s.srv.Close()
154 }
155
156 func (s *stubServer) serveStatic(path, data string) *reqTracker {
157         rt := &reqTracker{}
158         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
159                 rt.Add(r)
160                 if r.Body != nil {
161                         ioutil.ReadAll(r.Body)
162                         r.Body.Close()
163                 }
164                 io.WriteString(w, data)
165         })
166         return rt
167 }
168
169 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
170         return s.serveStatic("/arvados/v1/users/current",
171                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
172 }
173
174 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
175         return s.serveStatic("/arvados/v1/users/current",
176                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
177 }
178
179 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
180         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
181                 `{"defaultCollectionReplication":2}`)
182 }
183
184 func (s *stubServer) serveZeroCollections() *reqTracker {
185         return s.serveStatic("/arvados/v1/collections",
186                 `{"items":[],"items_available":0}`)
187 }
188
189 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
190         rt := &reqTracker{}
191         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
192                 r.ParseForm()
193                 rt.Add(r)
194                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
195                         io.WriteString(w, `{"items_available":0,"items":[]}`)
196                 } else {
197                         io.WriteString(w, `{"items_available":3,"items":[
198                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
199                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
200                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
201                 }
202         })
203         return rt
204 }
205
206 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
207         rt := &reqTracker{}
208         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
209                 r.ParseForm()
210                 rt.Add(r)
211                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
212                         io.WriteString(w, `{"items_available":3,"items":[]}`)
213                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
214                         io.WriteString(w, `{"items_available":0,"items":[]}`)
215                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
216                         io.WriteString(w, `{"items_available":0,"items":[]}`)
217                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
218                         io.WriteString(w, `{"items_available":0,"items":[]}`)
219                 } else {
220                         io.WriteString(w, `{"items_available":2,"items":[
221                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
222                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
223                 }
224         })
225         return rt
226 }
227
228 func (s *stubServer) serveZeroKeepServices() *reqTracker {
229         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
230 }
231
232 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
233         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
234                 ItemsAvailable: len(svcs),
235                 Items:          svcs,
236         })
237 }
238
239 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
240         rt := &reqTracker{}
241         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
242                 rt.Add(r)
243                 json.NewEncoder(w).Encode(resp)
244         })
245         return rt
246 }
247
248 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
249         rt := &reqTracker{}
250         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
251                 rt.Add(r)
252                 json.NewEncoder(w).Encode(stubMounts[r.Host])
253         })
254         return rt
255 }
256
257 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
258         rt := &reqTracker{}
259         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
260                 count := rt.Add(r)
261                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
262                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
263                 }
264                 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
265         })
266         for _, mounts := range stubMounts {
267                 for i, mnt := range mounts {
268                         i := i
269                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
270                                 count := rt.Add(r)
271                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
272                                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
273                                 }
274                                 if i == 0 {
275                                         fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
276                                 }
277                                 fmt.Fprintf(w, "\n")
278                         })
279                 }
280         }
281         return rt
282 }
283
284 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
285         rt := &reqTracker{}
286         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
287                 rt.Add(r)
288                 io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
289         })
290         for _, mounts := range stubMounts {
291                 for i, mnt := range mounts {
292                         i := i
293                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
294                                 rt.Add(r)
295                                 if i == 0 {
296                                         io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
297                                 } else {
298                                         io.WriteString(w, "\n")
299                                 }
300                         })
301                 }
302         }
303         return rt
304 }
305
306 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
307         return s.serveStatic("/trash", `{}`)
308 }
309
310 func (s *stubServer) serveKeepstorePull() *reqTracker {
311         return s.serveStatic("/pull", `{}`)
312 }
313
314 type runSuite struct {
315         stub   stubServer
316         config *arvados.Cluster
317         db     *sqlx.DB
318         client *arvados.Client
319 }
320
321 func (s *runSuite) newServer(options *RunOptions) *Server {
322         srv := &Server{
323                 Cluster:    s.config,
324                 ArvClient:  s.client,
325                 RunOptions: *options,
326                 Metrics:    newMetrics(prometheus.NewRegistry()),
327                 Logger:     options.Logger,
328                 Dumper:     options.Dumper,
329                 DB:         s.db,
330         }
331         return srv
332 }
333
334 func (s *runSuite) SetUpTest(c *check.C) {
335         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
336         c.Assert(err, check.Equals, nil)
337         s.config, err = cfg.GetCluster("")
338         c.Assert(err, check.Equals, nil)
339         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
340         c.Assert(err, check.IsNil)
341
342         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
343         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
344
345         s.client = &arvados.Client{
346                 AuthToken: "xyzzy",
347                 APIHost:   "zzzzz.arvadosapi.com",
348                 Client:    s.stub.Start()}
349
350         s.stub.serveDiscoveryDoc()
351         s.stub.logf = c.Logf
352 }
353
354 func (s *runSuite) TearDownTest(c *check.C) {
355         s.stub.Close()
356 }
357
358 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
359         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
360         _, err := s.db.Exec(`delete from collections`)
361         c.Assert(err, check.IsNil)
362         opts := RunOptions{
363                 CommitPulls: true,
364                 CommitTrash: true,
365                 Logger:      ctxlog.TestLogger(c),
366         }
367         s.stub.serveCurrentUserAdmin()
368         s.stub.serveZeroCollections()
369         s.stub.serveKeepServices(stubServices)
370         s.stub.serveKeepstoreMounts()
371         s.stub.serveKeepstoreIndexFoo4Bar1()
372         trashReqs := s.stub.serveKeepstoreTrash()
373         pullReqs := s.stub.serveKeepstorePull()
374         srv := s.newServer(&opts)
375         _, err = srv.runOnce()
376         c.Check(err, check.ErrorMatches, "received zero collections")
377         c.Check(trashReqs.Count(), check.Equals, 4)
378         c.Check(pullReqs.Count(), check.Equals, 0)
379 }
380
381 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
382         opts := RunOptions{
383                 CommitPulls: true,
384                 CommitTrash: true,
385                 Logger:      ctxlog.TestLogger(c),
386         }
387         s.stub.serveCurrentUserNotAdmin()
388         s.stub.serveZeroCollections()
389         s.stub.serveKeepServices(stubServices)
390         s.stub.serveKeepstoreMounts()
391         trashReqs := s.stub.serveKeepstoreTrash()
392         pullReqs := s.stub.serveKeepstorePull()
393         srv := s.newServer(&opts)
394         _, err := srv.runOnce()
395         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
396         c.Check(trashReqs.Count(), check.Equals, 0)
397         c.Check(pullReqs.Count(), check.Equals, 0)
398 }
399
400 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
401         opts := RunOptions{
402                 CommitPulls: true,
403                 CommitTrash: true,
404                 Logger:      ctxlog.TestLogger(c),
405         }
406         s.stub.serveCurrentUserAdmin()
407         s.stub.serveZeroCollections()
408         s.stub.serveKeepServices(stubServices)
409         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
410                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
411                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
412                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
413                         DeviceID:       "keep0-vol0",
414                         StorageClasses: map[string]bool{"default": true},
415                 }})
416         })
417         trashReqs := s.stub.serveKeepstoreTrash()
418         pullReqs := s.stub.serveKeepstorePull()
419         srv := s.newServer(&opts)
420         _, err := srv.runOnce()
421         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
422         c.Check(trashReqs.Count(), check.Equals, 0)
423         c.Check(pullReqs.Count(), check.Equals, 0)
424 }
425
426 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
427         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
428         c.Assert(err, check.IsNil)
429         s.config.Collections.BlobMissingReport = lostf.Name()
430         defer os.Remove(lostf.Name())
431         opts := RunOptions{
432                 CommitPulls: true,
433                 CommitTrash: true,
434                 Logger:      ctxlog.TestLogger(c),
435         }
436         s.stub.serveCurrentUserAdmin()
437         s.stub.serveFooBarFileCollections()
438         s.stub.serveKeepServices(stubServices)
439         s.stub.serveKeepstoreMounts()
440         s.stub.serveKeepstoreIndexFoo1()
441         s.stub.serveKeepstoreTrash()
442         s.stub.serveKeepstorePull()
443         srv := s.newServer(&opts)
444         c.Assert(err, check.IsNil)
445         _, err = srv.runOnce()
446         c.Check(err, check.IsNil)
447         lost, err := ioutil.ReadFile(lostf.Name())
448         c.Assert(err, check.IsNil)
449         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
450 }
451
452 func (s *runSuite) TestDryRun(c *check.C) {
453         opts := RunOptions{
454                 CommitPulls: false,
455                 CommitTrash: false,
456                 Logger:      ctxlog.TestLogger(c),
457         }
458         s.stub.serveCurrentUserAdmin()
459         collReqs := s.stub.serveFooBarFileCollections()
460         s.stub.serveKeepServices(stubServices)
461         s.stub.serveKeepstoreMounts()
462         s.stub.serveKeepstoreIndexFoo4Bar1()
463         trashReqs := s.stub.serveKeepstoreTrash()
464         pullReqs := s.stub.serveKeepstorePull()
465         srv := s.newServer(&opts)
466         bal, err := srv.runOnce()
467         c.Check(err, check.IsNil)
468         for _, req := range collReqs.reqs {
469                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
470                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
471         }
472         c.Check(trashReqs.Count(), check.Equals, 0)
473         c.Check(pullReqs.Count(), check.Equals, 0)
474         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
475         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
476         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
477 }
478
479 func (s *runSuite) TestCommit(c *check.C) {
480         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
481         s.config.ManagementToken = "xyzzy"
482         opts := RunOptions{
483                 CommitPulls: true,
484                 CommitTrash: true,
485                 Logger:      ctxlog.TestLogger(c),
486                 Dumper:      ctxlog.TestLogger(c),
487         }
488         s.stub.serveCurrentUserAdmin()
489         s.stub.serveFooBarFileCollections()
490         s.stub.serveKeepServices(stubServices)
491         s.stub.serveKeepstoreMounts()
492         s.stub.serveKeepstoreIndexFoo4Bar1()
493         trashReqs := s.stub.serveKeepstoreTrash()
494         pullReqs := s.stub.serveKeepstorePull()
495         srv := s.newServer(&opts)
496         bal, err := srv.runOnce()
497         c.Check(err, check.IsNil)
498         c.Check(trashReqs.Count(), check.Equals, 8)
499         c.Check(pullReqs.Count(), check.Equals, 4)
500         // "foo" block is overreplicated by 2
501         c.Check(bal.stats.trashes, check.Equals, 2)
502         // "bar" block is underreplicated by 1, and its only copy is
503         // in a poor rendezvous position
504         c.Check(bal.stats.pulls, check.Equals, 2)
505
506         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
507         c.Assert(err, check.IsNil)
508         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
509
510         buf, err := s.getMetrics(c, srv)
511         c.Check(err, check.IsNil)
512         bufstr := buf.String()
513         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
514         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
515         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
516         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
517         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
518 }
519
520 func (s *runSuite) TestRunForever(c *check.C) {
521         s.config.ManagementToken = "xyzzy"
522         opts := RunOptions{
523                 CommitPulls: true,
524                 CommitTrash: true,
525                 Logger:      ctxlog.TestLogger(c),
526                 Dumper:      ctxlog.TestLogger(c),
527         }
528         s.stub.serveCurrentUserAdmin()
529         s.stub.serveFooBarFileCollections()
530         s.stub.serveKeepServices(stubServices)
531         s.stub.serveKeepstoreMounts()
532         s.stub.serveKeepstoreIndexFoo4Bar1()
533         trashReqs := s.stub.serveKeepstoreTrash()
534         pullReqs := s.stub.serveKeepstorePull()
535
536         stop := make(chan interface{})
537         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
538         srv := s.newServer(&opts)
539
540         done := make(chan bool)
541         go func() {
542                 srv.runForever(stop)
543                 close(done)
544         }()
545
546         // Each run should send 4 pull lists + 4 trash lists. The
547         // first run should also send 4 empty trash lists at
548         // startup. We should complete all four runs in much less than
549         // a second.
550         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
551                 time.Sleep(time.Millisecond)
552         }
553         stop <- true
554         <-done
555         c.Check(pullReqs.Count() >= 16, check.Equals, true)
556         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
557
558         buf, err := s.getMetrics(c, srv)
559         c.Check(err, check.IsNil)
560         c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
561 }
562
563 func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
564         mfs, err := srv.Metrics.reg.Gather()
565         if err != nil {
566                 return nil, err
567         }
568
569         var buf bytes.Buffer
570         for _, mf := range mfs {
571                 if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
572                         return nil, err
573                 }
574         }
575
576         return &buf, nil
577 }