1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/config"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/jmoiron/sqlx"
26 "github.com/prometheus/client_golang/prometheus"
27 "github.com/prometheus/common/expfmt"
28 check "gopkg.in/check.v1"
31 var _ = check.Suite(&runSuite{})
33 type reqTracker struct {
38 func (rt *reqTracker) Count() int {
44 func (rt *reqTracker) Add(req *http.Request) int {
47 rt.reqs = append(rt.reqs, *req)
51 var stubServices = []arvados.KeepService{
53 UUID: "zzzzz-bi6l4-000000000000000",
54 ServiceHost: "keep0.zzzzz.arvadosapi.com",
56 ServiceSSLFlag: false,
60 UUID: "zzzzz-bi6l4-000000000000001",
61 ServiceHost: "keep1.zzzzz.arvadosapi.com",
63 ServiceSSLFlag: false,
67 UUID: "zzzzz-bi6l4-000000000000002",
68 ServiceHost: "keep2.zzzzz.arvadosapi.com",
70 ServiceSSLFlag: false,
74 UUID: "zzzzz-bi6l4-000000000000003",
75 ServiceHost: "keep3.zzzzz.arvadosapi.com",
77 ServiceSSLFlag: false,
81 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
82 ServiceHost: "keep.zzzzz.arvadosapi.com",
89 var stubMounts = map[string][]arvados.KeepMount{
90 "keep0.zzzzz.arvadosapi.com:25107": {{
91 UUID: "zzzzz-ivpuk-000000000000000",
92 DeviceID: "keep0-vol0",
93 StorageClasses: map[string]bool{"default": true},
95 "keep1.zzzzz.arvadosapi.com:25107": {{
96 UUID: "zzzzz-ivpuk-100000000000000",
97 DeviceID: "keep1-vol0",
98 StorageClasses: map[string]bool{"default": true},
100 "keep2.zzzzz.arvadosapi.com:25107": {{
101 UUID: "zzzzz-ivpuk-200000000000000",
102 DeviceID: "keep2-vol0",
103 StorageClasses: map[string]bool{"default": true},
105 "keep3.zzzzz.arvadosapi.com:25107": {{
106 UUID: "zzzzz-ivpuk-300000000000000",
107 DeviceID: "keep3-vol0",
108 StorageClasses: map[string]bool{"default": true},
112 // stubServer is an HTTP transport that intercepts and processes all
113 // requests using its own handlers.
114 type stubServer struct {
119 logf func(string, ...interface{})
122 // Start initializes the stub server and returns an *http.Client that
123 // uses the stub server to handle all requests.
125 // A stubServer that has been started should eventually be shut down
127 func (s *stubServer) Start() *http.Client {
128 // Set up a config.Client that forwards all requests to s.mux
129 // via s.srv. Test cases will attach handlers to s.mux to get
130 // the desired responses.
131 s.mux = http.NewServeMux()
132 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
136 w.Header().Set("Content-Type", "application/json")
137 s.mux.ServeHTTP(w, r)
139 return &http.Client{Transport: s}
142 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
143 w := httptest.NewRecorder()
144 s.mux.ServeHTTP(w, req)
145 return &http.Response{
147 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
149 Body: ioutil.NopCloser(w.Body)}, nil
152 // Close releases resources used by the server.
153 func (s *stubServer) Close() {
157 func (s *stubServer) serveStatic(path, data string) *reqTracker {
159 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
162 ioutil.ReadAll(r.Body)
165 io.WriteString(w, data)
170 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
171 return s.serveStatic("/arvados/v1/users/current",
172 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
175 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
176 return s.serveStatic("/arvados/v1/users/current",
177 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
180 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
181 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
182 `{"defaultCollectionReplication":2}`)
185 func (s *stubServer) serveZeroCollections() *reqTracker {
186 return s.serveStatic("/arvados/v1/collections",
187 `{"items":[],"items_available":0}`)
190 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
192 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
195 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
196 io.WriteString(w, `{"items_available":0,"items":[]}`)
198 io.WriteString(w, `{"items_available":3,"items":[
199 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
200 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
201 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
207 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
209 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
212 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
213 io.WriteString(w, `{"items_available":3,"items":[]}`)
214 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
215 io.WriteString(w, `{"items_available":0,"items":[]}`)
216 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
217 io.WriteString(w, `{"items_available":0,"items":[]}`)
218 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
219 io.WriteString(w, `{"items_available":0,"items":[]}`)
221 io.WriteString(w, `{"items_available":2,"items":[
222 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
223 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
229 func (s *stubServer) serveZeroKeepServices() *reqTracker {
230 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
233 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
234 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
235 ItemsAvailable: len(svcs),
240 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
242 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
244 json.NewEncoder(w).Encode(resp)
249 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
251 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
253 json.NewEncoder(w).Encode(stubMounts[r.Host])
258 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
259 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
260 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
262 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
264 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
265 io.WriteString(w, barLine)
267 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
268 io.WriteString(w, fooLine(count))
270 io.WriteString(w, "\n")
272 for _, mounts := range stubMounts {
273 for i, mnt := range mounts {
275 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
278 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
279 io.WriteString(w, barLine)
281 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
282 io.WriteString(w, fooLine(count))
284 io.WriteString(w, "\n")
291 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
292 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
294 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
296 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
297 io.WriteString(w, fooLine)
299 io.WriteString(w, "\n")
301 for _, mounts := range stubMounts {
302 for i, mnt := range mounts {
304 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
306 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
307 io.WriteString(w, fooLine)
309 io.WriteString(w, "\n")
316 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
317 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
319 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
321 io.WriteString(w, fooLine)
322 io.WriteString(w, "\n")
324 for _, mounts := range stubMounts {
325 for _, mnt := range mounts {
326 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
328 io.WriteString(w, fooLine)
329 io.WriteString(w, "\n")
336 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
337 return s.serveStatic("/trash", `{}`)
340 func (s *stubServer) serveKeepstorePull() *reqTracker {
341 return s.serveStatic("/pull", `{}`)
344 type runSuite struct {
346 config *arvados.Cluster
348 client *arvados.Client
351 func (s *runSuite) newServer(options *RunOptions) *Server {
355 RunOptions: *options,
356 Metrics: newMetrics(prometheus.NewRegistry()),
357 Logger: options.Logger,
358 Dumper: options.Dumper,
364 func (s *runSuite) SetUpTest(c *check.C) {
365 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
366 c.Assert(err, check.Equals, nil)
367 s.config, err = cfg.GetCluster("")
368 c.Assert(err, check.Equals, nil)
369 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
370 c.Assert(err, check.IsNil)
372 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
373 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
375 s.client = &arvados.Client{
377 APIHost: "zzzzz.arvadosapi.com",
378 Client: s.stub.Start()}
380 s.stub.serveDiscoveryDoc()
384 func (s *runSuite) TearDownTest(c *check.C) {
388 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
389 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
390 _, err := s.db.Exec(`delete from collections`)
391 c.Assert(err, check.IsNil)
395 Logger: ctxlog.TestLogger(c),
397 s.stub.serveCurrentUserAdmin()
398 s.stub.serveZeroCollections()
399 s.stub.serveKeepServices(stubServices)
400 s.stub.serveKeepstoreMounts()
401 s.stub.serveKeepstoreIndexFoo4Bar1()
402 trashReqs := s.stub.serveKeepstoreTrash()
403 pullReqs := s.stub.serveKeepstorePull()
404 srv := s.newServer(&opts)
405 _, err = srv.runOnce(context.Background())
406 c.Check(err, check.ErrorMatches, "received zero collections")
407 c.Check(trashReqs.Count(), check.Equals, 4)
408 c.Check(pullReqs.Count(), check.Equals, 0)
411 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
416 Logger: ctxlog.TestLogger(c),
418 s.stub.serveCurrentUserAdmin()
419 s.stub.serveFooBarFileCollections()
420 s.stub.serveKeepServices(stubServices)
421 s.stub.serveKeepstoreMounts()
422 s.stub.serveKeepstoreIndexIgnoringPrefix()
423 trashReqs := s.stub.serveKeepstoreTrash()
424 pullReqs := s.stub.serveKeepstorePull()
425 srv := s.newServer(&opts)
426 bal, err := srv.runOnce(context.Background())
427 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
428 c.Check(trashReqs.Count(), check.Equals, 4)
429 c.Check(pullReqs.Count(), check.Equals, 0)
430 c.Check(bal.stats.trashes, check.Equals, 0)
431 c.Check(bal.stats.pulls, check.Equals, 0)
434 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
438 Logger: ctxlog.TestLogger(c),
440 s.stub.serveCurrentUserNotAdmin()
441 s.stub.serveZeroCollections()
442 s.stub.serveKeepServices(stubServices)
443 s.stub.serveKeepstoreMounts()
444 trashReqs := s.stub.serveKeepstoreTrash()
445 pullReqs := s.stub.serveKeepstorePull()
446 srv := s.newServer(&opts)
447 _, err := srv.runOnce(context.Background())
448 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
449 c.Check(trashReqs.Count(), check.Equals, 0)
450 c.Check(pullReqs.Count(), check.Equals, 0)
453 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
454 for _, trial := range []struct {
458 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
459 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
460 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
463 c.Logf("trying invalid prefix %q", trial.prefix)
467 ChunkPrefix: trial.prefix,
468 Logger: ctxlog.TestLogger(c),
470 s.stub.serveCurrentUserAdmin()
471 s.stub.serveFooBarFileCollections()
472 s.stub.serveKeepServices(stubServices)
473 s.stub.serveKeepstoreMounts()
474 trashReqs := s.stub.serveKeepstoreTrash()
475 pullReqs := s.stub.serveKeepstorePull()
476 srv := s.newServer(&opts)
477 _, err := srv.runOnce(context.Background())
478 c.Check(err, check.ErrorMatches, trial.errRe)
479 c.Check(trashReqs.Count(), check.Equals, 0)
480 c.Check(pullReqs.Count(), check.Equals, 0)
484 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
488 Logger: ctxlog.TestLogger(c),
490 s.stub.serveCurrentUserAdmin()
491 s.stub.serveZeroCollections()
492 s.stub.serveKeepServices(stubServices)
493 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
494 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
495 json.NewEncoder(w).Encode([]arvados.KeepMount{{
496 UUID: "zzzzz-ivpuk-0000000000" + hostid,
497 DeviceID: "keep0-vol0",
498 StorageClasses: map[string]bool{"default": true},
501 trashReqs := s.stub.serveKeepstoreTrash()
502 pullReqs := s.stub.serveKeepstorePull()
503 srv := s.newServer(&opts)
504 _, err := srv.runOnce(context.Background())
505 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
506 c.Check(trashReqs.Count(), check.Equals, 0)
507 c.Check(pullReqs.Count(), check.Equals, 0)
510 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
511 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
512 c.Assert(err, check.IsNil)
513 s.config.Collections.BlobMissingReport = lostf.Name()
514 defer os.Remove(lostf.Name())
518 Logger: ctxlog.TestLogger(c),
520 s.stub.serveCurrentUserAdmin()
521 s.stub.serveFooBarFileCollections()
522 s.stub.serveKeepServices(stubServices)
523 s.stub.serveKeepstoreMounts()
524 s.stub.serveKeepstoreIndexFoo1()
525 s.stub.serveKeepstoreTrash()
526 s.stub.serveKeepstorePull()
527 srv := s.newServer(&opts)
528 c.Assert(err, check.IsNil)
529 _, err = srv.runOnce(context.Background())
530 c.Check(err, check.IsNil)
531 lost, err := ioutil.ReadFile(lostf.Name())
532 c.Assert(err, check.IsNil)
533 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
536 func (s *runSuite) TestDryRun(c *check.C) {
540 Logger: ctxlog.TestLogger(c),
542 s.stub.serveCurrentUserAdmin()
543 collReqs := s.stub.serveFooBarFileCollections()
544 s.stub.serveKeepServices(stubServices)
545 s.stub.serveKeepstoreMounts()
546 s.stub.serveKeepstoreIndexFoo4Bar1()
547 trashReqs := s.stub.serveKeepstoreTrash()
548 pullReqs := s.stub.serveKeepstorePull()
549 srv := s.newServer(&opts)
550 bal, err := srv.runOnce(context.Background())
551 c.Check(err, check.IsNil)
552 for _, req := range collReqs.reqs {
553 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
554 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
556 c.Check(trashReqs.Count(), check.Equals, 0)
557 c.Check(pullReqs.Count(), check.Equals, 0)
558 c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
559 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
560 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
563 func (s *runSuite) TestCommit(c *check.C) {
564 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
565 s.config.ManagementToken = "xyzzy"
569 Logger: ctxlog.TestLogger(c),
570 Dumper: ctxlog.TestLogger(c),
572 s.stub.serveCurrentUserAdmin()
573 s.stub.serveFooBarFileCollections()
574 s.stub.serveKeepServices(stubServices)
575 s.stub.serveKeepstoreMounts()
576 s.stub.serveKeepstoreIndexFoo4Bar1()
577 trashReqs := s.stub.serveKeepstoreTrash()
578 pullReqs := s.stub.serveKeepstorePull()
579 srv := s.newServer(&opts)
580 bal, err := srv.runOnce(context.Background())
581 c.Check(err, check.IsNil)
582 c.Check(trashReqs.Count(), check.Equals, 8)
583 c.Check(pullReqs.Count(), check.Equals, 4)
584 // "foo" block is overreplicated by 2
585 c.Check(bal.stats.trashes, check.Equals, 2)
586 // "bar" block is underreplicated by 1, and its only copy is
587 // in a poor rendezvous position
588 c.Check(bal.stats.pulls, check.Equals, 2)
590 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
591 c.Assert(err, check.IsNil)
592 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
594 buf, err := s.getMetrics(c, srv)
595 c.Check(err, check.IsNil)
596 bufstr := buf.String()
597 c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
598 c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
599 c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
600 c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
601 c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
604 func (s *runSuite) TestChunkPrefix(c *check.C) {
605 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
609 ChunkPrefix: "ac", // catch "foo" but not "bar"
610 Logger: ctxlog.TestLogger(c),
611 Dumper: ctxlog.TestLogger(c),
613 s.stub.serveCurrentUserAdmin()
614 s.stub.serveFooBarFileCollections()
615 s.stub.serveKeepServices(stubServices)
616 s.stub.serveKeepstoreMounts()
617 s.stub.serveKeepstoreIndexFoo4Bar1()
618 trashReqs := s.stub.serveKeepstoreTrash()
619 pullReqs := s.stub.serveKeepstorePull()
620 srv := s.newServer(&opts)
621 bal, err := srv.runOnce(context.Background())
622 c.Check(err, check.IsNil)
623 c.Check(trashReqs.Count(), check.Equals, 8)
624 c.Check(pullReqs.Count(), check.Equals, 4)
625 // "foo" block is overreplicated by 2
626 c.Check(bal.stats.trashes, check.Equals, 2)
627 // "bar" block is underreplicated but does not match prefix
628 c.Check(bal.stats.pulls, check.Equals, 0)
630 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
631 c.Assert(err, check.IsNil)
632 c.Check(string(lost), check.Equals, "")
635 func (s *runSuite) TestRunForever(c *check.C) {
636 s.config.ManagementToken = "xyzzy"
640 Logger: ctxlog.TestLogger(c),
641 Dumper: ctxlog.TestLogger(c),
643 s.stub.serveCurrentUserAdmin()
644 s.stub.serveFooBarFileCollections()
645 s.stub.serveKeepServices(stubServices)
646 s.stub.serveKeepstoreMounts()
647 s.stub.serveKeepstoreIndexFoo4Bar1()
648 trashReqs := s.stub.serveKeepstoreTrash()
649 pullReqs := s.stub.serveKeepstorePull()
651 ctx, cancel := context.WithCancel(context.Background())
653 s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
654 srv := s.newServer(&opts)
656 done := make(chan bool)
662 // Each run should send 4 pull lists + 4 trash lists. The
663 // first run should also send 4 empty trash lists at
664 // startup. We should complete all four runs in much less than
666 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
667 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
670 time.Sleep(time.Millisecond)
674 c.Check(pullReqs.Count() >= 16, check.Equals, true)
675 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
677 buf, err := s.getMetrics(c, srv)
678 c.Check(err, check.IsNil)
679 c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
682 func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
683 mfs, err := srv.Metrics.reg.Gather()
689 for _, mf := range mfs {
690 if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {