Merge branch '19678-job-loader' refs #19678
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/jmoiron/sqlx"
26         "github.com/prometheus/client_golang/prometheus"
27         "github.com/prometheus/common/expfmt"
28         check "gopkg.in/check.v1"
29 )
30
31 var _ = check.Suite(&runSuite{})
32
33 type reqTracker struct {
34         reqs []http.Request
35         sync.Mutex
36 }
37
38 func (rt *reqTracker) Count() int {
39         rt.Lock()
40         defer rt.Unlock()
41         return len(rt.reqs)
42 }
43
44 func (rt *reqTracker) Add(req *http.Request) int {
45         rt.Lock()
46         defer rt.Unlock()
47         rt.reqs = append(rt.reqs, *req)
48         return len(rt.reqs)
49 }
50
51 var stubServices = []arvados.KeepService{
52         {
53                 UUID:           "zzzzz-bi6l4-000000000000000",
54                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
55                 ServicePort:    25107,
56                 ServiceSSLFlag: false,
57                 ServiceType:    "disk",
58         },
59         {
60                 UUID:           "zzzzz-bi6l4-000000000000001",
61                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
62                 ServicePort:    25107,
63                 ServiceSSLFlag: false,
64                 ServiceType:    "disk",
65         },
66         {
67                 UUID:           "zzzzz-bi6l4-000000000000002",
68                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
69                 ServicePort:    25107,
70                 ServiceSSLFlag: false,
71                 ServiceType:    "disk",
72         },
73         {
74                 UUID:           "zzzzz-bi6l4-000000000000003",
75                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
76                 ServicePort:    25107,
77                 ServiceSSLFlag: false,
78                 ServiceType:    "disk",
79         },
80         {
81                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
82                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
83                 ServicePort:    25333,
84                 ServiceSSLFlag: true,
85                 ServiceType:    "proxy",
86         },
87 }
88
89 var stubMounts = map[string][]arvados.KeepMount{
90         "keep0.zzzzz.arvadosapi.com:25107": {{
91                 UUID:           "zzzzz-ivpuk-000000000000000",
92                 DeviceID:       "keep0-vol0",
93                 StorageClasses: map[string]bool{"default": true},
94         }},
95         "keep1.zzzzz.arvadosapi.com:25107": {{
96                 UUID:           "zzzzz-ivpuk-100000000000000",
97                 DeviceID:       "keep1-vol0",
98                 StorageClasses: map[string]bool{"default": true},
99         }},
100         "keep2.zzzzz.arvadosapi.com:25107": {{
101                 UUID:           "zzzzz-ivpuk-200000000000000",
102                 DeviceID:       "keep2-vol0",
103                 StorageClasses: map[string]bool{"default": true},
104         }},
105         "keep3.zzzzz.arvadosapi.com:25107": {{
106                 UUID:           "zzzzz-ivpuk-300000000000000",
107                 DeviceID:       "keep3-vol0",
108                 StorageClasses: map[string]bool{"default": true},
109         }},
110 }
111
112 // stubServer is an HTTP transport that intercepts and processes all
113 // requests using its own handlers.
114 type stubServer struct {
115         mux      *http.ServeMux
116         srv      *httptest.Server
117         mutex    sync.Mutex
118         Requests reqTracker
119         logf     func(string, ...interface{})
120 }
121
122 // Start initializes the stub server and returns an *http.Client that
123 // uses the stub server to handle all requests.
124 //
125 // A stubServer that has been started should eventually be shut down
126 // with Close().
127 func (s *stubServer) Start() *http.Client {
128         // Set up a config.Client that forwards all requests to s.mux
129         // via s.srv. Test cases will attach handlers to s.mux to get
130         // the desired responses.
131         s.mux = http.NewServeMux()
132         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
133                 s.mutex.Lock()
134                 s.Requests.Add(r)
135                 s.mutex.Unlock()
136                 w.Header().Set("Content-Type", "application/json")
137                 s.mux.ServeHTTP(w, r)
138         }))
139         return &http.Client{Transport: s}
140 }
141
142 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
143         w := httptest.NewRecorder()
144         s.mux.ServeHTTP(w, req)
145         return &http.Response{
146                 StatusCode: w.Code,
147                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
148                 Header:     w.HeaderMap,
149                 Body:       ioutil.NopCloser(w.Body)}, nil
150 }
151
152 // Close releases resources used by the server.
153 func (s *stubServer) Close() {
154         s.srv.Close()
155 }
156
157 func (s *stubServer) serveStatic(path, data string) *reqTracker {
158         rt := &reqTracker{}
159         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
160                 rt.Add(r)
161                 if r.Body != nil {
162                         ioutil.ReadAll(r.Body)
163                         r.Body.Close()
164                 }
165                 io.WriteString(w, data)
166         })
167         return rt
168 }
169
170 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
171         return s.serveStatic("/arvados/v1/users/current",
172                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
173 }
174
175 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
176         return s.serveStatic("/arvados/v1/users/current",
177                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
178 }
179
180 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
181         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
182                 `{"defaultCollectionReplication":2}`)
183 }
184
185 func (s *stubServer) serveZeroCollections() *reqTracker {
186         return s.serveStatic("/arvados/v1/collections",
187                 `{"items":[],"items_available":0}`)
188 }
189
190 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
191         rt := &reqTracker{}
192         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
193                 r.ParseForm()
194                 rt.Add(r)
195                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
196                         io.WriteString(w, `{"items_available":0,"items":[]}`)
197                 } else {
198                         io.WriteString(w, `{"items_available":3,"items":[
199                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
200                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
201                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
202                 }
203         })
204         return rt
205 }
206
207 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
208         rt := &reqTracker{}
209         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
210                 r.ParseForm()
211                 rt.Add(r)
212                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
213                         io.WriteString(w, `{"items_available":3,"items":[]}`)
214                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
215                         io.WriteString(w, `{"items_available":0,"items":[]}`)
216                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
217                         io.WriteString(w, `{"items_available":0,"items":[]}`)
218                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
219                         io.WriteString(w, `{"items_available":0,"items":[]}`)
220                 } else {
221                         io.WriteString(w, `{"items_available":2,"items":[
222                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
223                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
224                 }
225         })
226         return rt
227 }
228
229 func (s *stubServer) serveZeroKeepServices() *reqTracker {
230         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
231 }
232
233 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
234         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
235                 ItemsAvailable: len(svcs),
236                 Items:          svcs,
237         })
238 }
239
240 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
241         rt := &reqTracker{}
242         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
243                 rt.Add(r)
244                 json.NewEncoder(w).Encode(resp)
245         })
246         return rt
247 }
248
249 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
250         rt := &reqTracker{}
251         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
252                 rt.Add(r)
253                 json.NewEncoder(w).Encode(stubMounts[r.Host])
254         })
255         return rt
256 }
257
258 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
259         rt := &reqTracker{}
260         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
261                 count := rt.Add(r)
262                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
263                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
264                 }
265                 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
266         })
267         for _, mounts := range stubMounts {
268                 for i, mnt := range mounts {
269                         i := i
270                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
271                                 count := rt.Add(r)
272                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
273                                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
274                                 }
275                                 if i == 0 {
276                                         fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
277                                 }
278                                 fmt.Fprintf(w, "\n")
279                         })
280                 }
281         }
282         return rt
283 }
284
285 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
286         rt := &reqTracker{}
287         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
288                 rt.Add(r)
289                 io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
290         })
291         for _, mounts := range stubMounts {
292                 for i, mnt := range mounts {
293                         i := i
294                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
295                                 rt.Add(r)
296                                 if i == 0 {
297                                         io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
298                                 } else {
299                                         io.WriteString(w, "\n")
300                                 }
301                         })
302                 }
303         }
304         return rt
305 }
306
307 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
308         return s.serveStatic("/trash", `{}`)
309 }
310
311 func (s *stubServer) serveKeepstorePull() *reqTracker {
312         return s.serveStatic("/pull", `{}`)
313 }
314
315 type runSuite struct {
316         stub   stubServer
317         config *arvados.Cluster
318         db     *sqlx.DB
319         client *arvados.Client
320 }
321
322 func (s *runSuite) newServer(options *RunOptions) *Server {
323         srv := &Server{
324                 Cluster:    s.config,
325                 ArvClient:  s.client,
326                 RunOptions: *options,
327                 Metrics:    newMetrics(prometheus.NewRegistry()),
328                 Logger:     options.Logger,
329                 Dumper:     options.Dumper,
330                 DB:         s.db,
331         }
332         return srv
333 }
334
335 func (s *runSuite) SetUpTest(c *check.C) {
336         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
337         c.Assert(err, check.Equals, nil)
338         s.config, err = cfg.GetCluster("")
339         c.Assert(err, check.Equals, nil)
340         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
341         c.Assert(err, check.IsNil)
342
343         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
344         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
345
346         s.client = &arvados.Client{
347                 AuthToken: "xyzzy",
348                 APIHost:   "zzzzz.arvadosapi.com",
349                 Client:    s.stub.Start()}
350
351         s.stub.serveDiscoveryDoc()
352         s.stub.logf = c.Logf
353 }
354
355 func (s *runSuite) TearDownTest(c *check.C) {
356         s.stub.Close()
357 }
358
359 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
360         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
361         _, err := s.db.Exec(`delete from collections`)
362         c.Assert(err, check.IsNil)
363         opts := RunOptions{
364                 CommitPulls: true,
365                 CommitTrash: true,
366                 Logger:      ctxlog.TestLogger(c),
367         }
368         s.stub.serveCurrentUserAdmin()
369         s.stub.serveZeroCollections()
370         s.stub.serveKeepServices(stubServices)
371         s.stub.serveKeepstoreMounts()
372         s.stub.serveKeepstoreIndexFoo4Bar1()
373         trashReqs := s.stub.serveKeepstoreTrash()
374         pullReqs := s.stub.serveKeepstorePull()
375         srv := s.newServer(&opts)
376         _, err = srv.runOnce(context.Background())
377         c.Check(err, check.ErrorMatches, "received zero collections")
378         c.Check(trashReqs.Count(), check.Equals, 4)
379         c.Check(pullReqs.Count(), check.Equals, 0)
380 }
381
382 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
383         opts := RunOptions{
384                 CommitPulls: true,
385                 CommitTrash: true,
386                 Logger:      ctxlog.TestLogger(c),
387         }
388         s.stub.serveCurrentUserNotAdmin()
389         s.stub.serveZeroCollections()
390         s.stub.serveKeepServices(stubServices)
391         s.stub.serveKeepstoreMounts()
392         trashReqs := s.stub.serveKeepstoreTrash()
393         pullReqs := s.stub.serveKeepstorePull()
394         srv := s.newServer(&opts)
395         _, err := srv.runOnce(context.Background())
396         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
397         c.Check(trashReqs.Count(), check.Equals, 0)
398         c.Check(pullReqs.Count(), check.Equals, 0)
399 }
400
401 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
402         opts := RunOptions{
403                 CommitPulls: true,
404                 CommitTrash: true,
405                 Logger:      ctxlog.TestLogger(c),
406         }
407         s.stub.serveCurrentUserAdmin()
408         s.stub.serveZeroCollections()
409         s.stub.serveKeepServices(stubServices)
410         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
411                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
412                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
413                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
414                         DeviceID:       "keep0-vol0",
415                         StorageClasses: map[string]bool{"default": true},
416                 }})
417         })
418         trashReqs := s.stub.serveKeepstoreTrash()
419         pullReqs := s.stub.serveKeepstorePull()
420         srv := s.newServer(&opts)
421         _, err := srv.runOnce(context.Background())
422         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
423         c.Check(trashReqs.Count(), check.Equals, 0)
424         c.Check(pullReqs.Count(), check.Equals, 0)
425 }
426
427 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
428         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
429         c.Assert(err, check.IsNil)
430         s.config.Collections.BlobMissingReport = lostf.Name()
431         defer os.Remove(lostf.Name())
432         opts := RunOptions{
433                 CommitPulls: true,
434                 CommitTrash: true,
435                 Logger:      ctxlog.TestLogger(c),
436         }
437         s.stub.serveCurrentUserAdmin()
438         s.stub.serveFooBarFileCollections()
439         s.stub.serveKeepServices(stubServices)
440         s.stub.serveKeepstoreMounts()
441         s.stub.serveKeepstoreIndexFoo1()
442         s.stub.serveKeepstoreTrash()
443         s.stub.serveKeepstorePull()
444         srv := s.newServer(&opts)
445         c.Assert(err, check.IsNil)
446         _, err = srv.runOnce(context.Background())
447         c.Check(err, check.IsNil)
448         lost, err := ioutil.ReadFile(lostf.Name())
449         c.Assert(err, check.IsNil)
450         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
451 }
452
453 func (s *runSuite) TestDryRun(c *check.C) {
454         opts := RunOptions{
455                 CommitPulls: false,
456                 CommitTrash: false,
457                 Logger:      ctxlog.TestLogger(c),
458         }
459         s.stub.serveCurrentUserAdmin()
460         collReqs := s.stub.serveFooBarFileCollections()
461         s.stub.serveKeepServices(stubServices)
462         s.stub.serveKeepstoreMounts()
463         s.stub.serveKeepstoreIndexFoo4Bar1()
464         trashReqs := s.stub.serveKeepstoreTrash()
465         pullReqs := s.stub.serveKeepstorePull()
466         srv := s.newServer(&opts)
467         bal, err := srv.runOnce(context.Background())
468         c.Check(err, check.IsNil)
469         for _, req := range collReqs.reqs {
470                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
471                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
472         }
473         c.Check(trashReqs.Count(), check.Equals, 0)
474         c.Check(pullReqs.Count(), check.Equals, 0)
475         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
476         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
477         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
478 }
479
480 func (s *runSuite) TestCommit(c *check.C) {
481         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
482         s.config.ManagementToken = "xyzzy"
483         opts := RunOptions{
484                 CommitPulls: true,
485                 CommitTrash: true,
486                 Logger:      ctxlog.TestLogger(c),
487                 Dumper:      ctxlog.TestLogger(c),
488         }
489         s.stub.serveCurrentUserAdmin()
490         s.stub.serveFooBarFileCollections()
491         s.stub.serveKeepServices(stubServices)
492         s.stub.serveKeepstoreMounts()
493         s.stub.serveKeepstoreIndexFoo4Bar1()
494         trashReqs := s.stub.serveKeepstoreTrash()
495         pullReqs := s.stub.serveKeepstorePull()
496         srv := s.newServer(&opts)
497         bal, err := srv.runOnce(context.Background())
498         c.Check(err, check.IsNil)
499         c.Check(trashReqs.Count(), check.Equals, 8)
500         c.Check(pullReqs.Count(), check.Equals, 4)
501         // "foo" block is overreplicated by 2
502         c.Check(bal.stats.trashes, check.Equals, 2)
503         // "bar" block is underreplicated by 1, and its only copy is
504         // in a poor rendezvous position
505         c.Check(bal.stats.pulls, check.Equals, 2)
506
507         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
508         c.Assert(err, check.IsNil)
509         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
510
511         buf, err := s.getMetrics(c, srv)
512         c.Check(err, check.IsNil)
513         bufstr := buf.String()
514         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
515         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
516         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
517         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
518         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
519 }
520
521 func (s *runSuite) TestRunForever(c *check.C) {
522         s.config.ManagementToken = "xyzzy"
523         opts := RunOptions{
524                 CommitPulls: true,
525                 CommitTrash: true,
526                 Logger:      ctxlog.TestLogger(c),
527                 Dumper:      ctxlog.TestLogger(c),
528         }
529         s.stub.serveCurrentUserAdmin()
530         s.stub.serveFooBarFileCollections()
531         s.stub.serveKeepServices(stubServices)
532         s.stub.serveKeepstoreMounts()
533         s.stub.serveKeepstoreIndexFoo4Bar1()
534         trashReqs := s.stub.serveKeepstoreTrash()
535         pullReqs := s.stub.serveKeepstorePull()
536
537         ctx, cancel := context.WithCancel(context.Background())
538         defer cancel()
539         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
540         srv := s.newServer(&opts)
541
542         done := make(chan bool)
543         go func() {
544                 srv.runForever(ctx)
545                 close(done)
546         }()
547
548         // Each run should send 4 pull lists + 4 trash lists. The
549         // first run should also send 4 empty trash lists at
550         // startup. We should complete all four runs in much less than
551         // a second.
552         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
553                 time.Sleep(time.Millisecond)
554         }
555         cancel()
556         <-done
557         c.Check(pullReqs.Count() >= 16, check.Equals, true)
558         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
559
560         buf, err := s.getMetrics(c, srv)
561         c.Check(err, check.IsNil)
562         c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
563 }
564
565 func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
566         mfs, err := srv.Metrics.reg.Gather()
567         if err != nil {
568                 return nil, err
569         }
570
571         var buf bytes.Buffer
572         for _, mf := range mfs {
573                 if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
574                         return nil, err
575                 }
576         }
577
578         return &buf, nil
579 }