1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/config"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/arvadostest"
23 "git.arvados.org/arvados.git/sdk/go/ctxlog"
24 "github.com/jmoiron/sqlx"
25 "github.com/prometheus/client_golang/prometheus"
26 check "gopkg.in/check.v1"
29 var _ = check.Suite(&runSuite{})
31 type reqTracker struct {
36 func (rt *reqTracker) Count() int {
42 func (rt *reqTracker) Add(req *http.Request) int {
45 rt.reqs = append(rt.reqs, *req)
49 var stubServices = []arvados.KeepService{
51 UUID: "zzzzz-bi6l4-000000000000000",
52 ServiceHost: "keep0.zzzzz.arvadosapi.com",
54 ServiceSSLFlag: false,
58 UUID: "zzzzz-bi6l4-000000000000001",
59 ServiceHost: "keep1.zzzzz.arvadosapi.com",
61 ServiceSSLFlag: false,
65 UUID: "zzzzz-bi6l4-000000000000002",
66 ServiceHost: "keep2.zzzzz.arvadosapi.com",
68 ServiceSSLFlag: false,
72 UUID: "zzzzz-bi6l4-000000000000003",
73 ServiceHost: "keep3.zzzzz.arvadosapi.com",
75 ServiceSSLFlag: false,
79 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
80 ServiceHost: "keep.zzzzz.arvadosapi.com",
87 var stubMounts = map[string][]arvados.KeepMount{
88 "keep0.zzzzz.arvadosapi.com:25107": {{
89 UUID: "zzzzz-ivpuk-000000000000000",
90 DeviceID: "keep0-vol0",
91 StorageClasses: map[string]bool{"default": true},
93 "keep1.zzzzz.arvadosapi.com:25107": {{
94 UUID: "zzzzz-ivpuk-100000000000000",
95 DeviceID: "keep1-vol0",
96 StorageClasses: map[string]bool{"default": true},
98 "keep2.zzzzz.arvadosapi.com:25107": {{
99 UUID: "zzzzz-ivpuk-200000000000000",
100 DeviceID: "keep2-vol0",
101 StorageClasses: map[string]bool{"default": true},
103 "keep3.zzzzz.arvadosapi.com:25107": {{
104 UUID: "zzzzz-ivpuk-300000000000000",
105 DeviceID: "keep3-vol0",
106 StorageClasses: map[string]bool{"default": true},
110 // stubServer is an HTTP transport that intercepts and processes all
111 // requests using its own handlers.
112 type stubServer struct {
117 logf func(string, ...interface{})
120 // Start initializes the stub server and returns an *http.Client that
121 // uses the stub server to handle all requests.
123 // A stubServer that has been started should eventually be shut down
125 func (s *stubServer) Start() *http.Client {
126 // Set up a config.Client that forwards all requests to s.mux
127 // via s.srv. Test cases will attach handlers to s.mux to get
128 // the desired responses.
129 s.mux = http.NewServeMux()
130 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134 w.Header().Set("Content-Type", "application/json")
135 s.mux.ServeHTTP(w, r)
137 return &http.Client{Transport: s}
140 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
141 w := httptest.NewRecorder()
142 s.mux.ServeHTTP(w, req)
143 return &http.Response{
145 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
147 Body: ioutil.NopCloser(w.Body)}, nil
150 // Close releases resources used by the server.
151 func (s *stubServer) Close() {
155 func (s *stubServer) serveStatic(path, data string) *reqTracker {
157 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
160 ioutil.ReadAll(r.Body)
163 io.WriteString(w, data)
168 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
169 return s.serveStatic("/arvados/v1/users/current",
170 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
173 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
174 return s.serveStatic("/arvados/v1/users/current",
175 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
178 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
179 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
180 `{"defaultCollectionReplication":2}`)
183 func (s *stubServer) serveZeroCollections() *reqTracker {
184 return s.serveStatic("/arvados/v1/collections",
185 `{"items":[],"items_available":0}`)
188 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
190 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
193 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
194 io.WriteString(w, `{"items_available":0,"items":[]}`)
196 io.WriteString(w, `{"items_available":3,"items":[
197 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
198 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
199 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
205 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
207 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
210 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
211 io.WriteString(w, `{"items_available":3,"items":[]}`)
212 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
213 io.WriteString(w, `{"items_available":0,"items":[]}`)
214 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
215 io.WriteString(w, `{"items_available":0,"items":[]}`)
216 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
217 io.WriteString(w, `{"items_available":0,"items":[]}`)
219 io.WriteString(w, `{"items_available":2,"items":[
220 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
221 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
227 func (s *stubServer) serveZeroKeepServices() *reqTracker {
228 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
231 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
232 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
233 ItemsAvailable: len(svcs),
238 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
240 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
242 json.NewEncoder(w).Encode(resp)
247 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
249 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
251 json.NewEncoder(w).Encode(stubMounts[r.Host])
256 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
257 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
258 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
260 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
262 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
263 io.WriteString(w, barLine)
265 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
266 io.WriteString(w, fooLine(count))
268 io.WriteString(w, "\n")
270 for _, mounts := range stubMounts {
271 for i, mnt := range mounts {
273 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
276 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
277 io.WriteString(w, barLine)
279 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
280 io.WriteString(w, fooLine(count))
282 io.WriteString(w, "\n")
289 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
290 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
292 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
294 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
295 io.WriteString(w, fooLine)
297 io.WriteString(w, "\n")
299 for _, mounts := range stubMounts {
300 for i, mnt := range mounts {
302 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
304 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
305 io.WriteString(w, fooLine)
307 io.WriteString(w, "\n")
314 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
315 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
317 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
319 io.WriteString(w, fooLine)
320 io.WriteString(w, "\n")
322 for _, mounts := range stubMounts {
323 for _, mnt := range mounts {
324 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
326 io.WriteString(w, fooLine)
327 io.WriteString(w, "\n")
334 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
335 return s.serveStatic("/trash", `{}`)
338 func (s *stubServer) serveKeepstorePull() *reqTracker {
339 return s.serveStatic("/pull", `{}`)
342 type runSuite struct {
344 config *arvados.Cluster
346 client *arvados.Client
349 func (s *runSuite) newServer(options *RunOptions) *Server {
353 RunOptions: *options,
354 Metrics: newMetrics(prometheus.NewRegistry()),
355 Logger: options.Logger,
356 Dumper: options.Dumper,
362 func (s *runSuite) SetUpTest(c *check.C) {
363 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
364 c.Assert(err, check.Equals, nil)
365 s.config, err = cfg.GetCluster("")
366 c.Assert(err, check.Equals, nil)
367 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
368 c.Assert(err, check.IsNil)
370 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
371 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
373 s.client = &arvados.Client{
375 APIHost: "zzzzz.arvadosapi.com",
376 Client: s.stub.Start()}
378 s.stub.serveDiscoveryDoc()
382 func (s *runSuite) TearDownTest(c *check.C) {
386 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
387 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
388 _, err := s.db.Exec(`delete from collections`)
389 c.Assert(err, check.IsNil)
393 Logger: ctxlog.TestLogger(c),
395 s.stub.serveCurrentUserAdmin()
396 s.stub.serveZeroCollections()
397 s.stub.serveKeepServices(stubServices)
398 s.stub.serveKeepstoreMounts()
399 s.stub.serveKeepstoreIndexFoo4Bar1()
400 trashReqs := s.stub.serveKeepstoreTrash()
401 pullReqs := s.stub.serveKeepstorePull()
402 srv := s.newServer(&opts)
403 _, err = srv.runOnce(context.Background())
404 c.Check(err, check.ErrorMatches, "received zero collections")
405 c.Check(trashReqs.Count(), check.Equals, 4)
406 c.Check(pullReqs.Count(), check.Equals, 0)
409 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
414 Logger: ctxlog.TestLogger(c),
416 s.stub.serveCurrentUserAdmin()
417 s.stub.serveFooBarFileCollections()
418 s.stub.serveKeepServices(stubServices)
419 s.stub.serveKeepstoreMounts()
420 s.stub.serveKeepstoreIndexIgnoringPrefix()
421 trashReqs := s.stub.serveKeepstoreTrash()
422 pullReqs := s.stub.serveKeepstorePull()
423 srv := s.newServer(&opts)
424 bal, err := srv.runOnce(context.Background())
425 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
426 c.Check(trashReqs.Count(), check.Equals, 4)
427 c.Check(pullReqs.Count(), check.Equals, 0)
428 c.Check(bal.stats.trashes, check.Equals, 0)
429 c.Check(bal.stats.pulls, check.Equals, 0)
432 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
436 Logger: ctxlog.TestLogger(c),
438 s.stub.serveCurrentUserNotAdmin()
439 s.stub.serveZeroCollections()
440 s.stub.serveKeepServices(stubServices)
441 s.stub.serveKeepstoreMounts()
442 trashReqs := s.stub.serveKeepstoreTrash()
443 pullReqs := s.stub.serveKeepstorePull()
444 srv := s.newServer(&opts)
445 _, err := srv.runOnce(context.Background())
446 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
447 c.Check(trashReqs.Count(), check.Equals, 0)
448 c.Check(pullReqs.Count(), check.Equals, 0)
451 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
452 for _, trial := range []struct {
456 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
457 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
458 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
461 c.Logf("trying invalid prefix %q", trial.prefix)
465 ChunkPrefix: trial.prefix,
466 Logger: ctxlog.TestLogger(c),
468 s.stub.serveCurrentUserAdmin()
469 s.stub.serveFooBarFileCollections()
470 s.stub.serveKeepServices(stubServices)
471 s.stub.serveKeepstoreMounts()
472 trashReqs := s.stub.serveKeepstoreTrash()
473 pullReqs := s.stub.serveKeepstorePull()
474 srv := s.newServer(&opts)
475 _, err := srv.runOnce(context.Background())
476 c.Check(err, check.ErrorMatches, trial.errRe)
477 c.Check(trashReqs.Count(), check.Equals, 0)
478 c.Check(pullReqs.Count(), check.Equals, 0)
482 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
486 Logger: ctxlog.TestLogger(c),
488 s.stub.serveCurrentUserAdmin()
489 s.stub.serveZeroCollections()
490 s.stub.serveKeepServices(stubServices)
491 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
492 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
493 json.NewEncoder(w).Encode([]arvados.KeepMount{{
494 UUID: "zzzzz-ivpuk-0000000000" + hostid,
495 DeviceID: "keep0-vol0",
496 StorageClasses: map[string]bool{"default": true},
499 trashReqs := s.stub.serveKeepstoreTrash()
500 pullReqs := s.stub.serveKeepstorePull()
501 srv := s.newServer(&opts)
502 _, err := srv.runOnce(context.Background())
503 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
504 c.Check(trashReqs.Count(), check.Equals, 0)
505 c.Check(pullReqs.Count(), check.Equals, 0)
508 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
509 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
510 c.Assert(err, check.IsNil)
511 s.config.Collections.BlobMissingReport = lostf.Name()
512 defer os.Remove(lostf.Name())
516 Logger: ctxlog.TestLogger(c),
518 s.stub.serveCurrentUserAdmin()
519 s.stub.serveFooBarFileCollections()
520 s.stub.serveKeepServices(stubServices)
521 s.stub.serveKeepstoreMounts()
522 s.stub.serveKeepstoreIndexFoo1()
523 s.stub.serveKeepstoreTrash()
524 s.stub.serveKeepstorePull()
525 srv := s.newServer(&opts)
526 c.Assert(err, check.IsNil)
527 _, err = srv.runOnce(context.Background())
528 c.Check(err, check.IsNil)
529 lost, err := ioutil.ReadFile(lostf.Name())
530 c.Assert(err, check.IsNil)
531 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
534 func (s *runSuite) TestDryRun(c *check.C) {
538 Logger: ctxlog.TestLogger(c),
540 s.stub.serveCurrentUserAdmin()
541 collReqs := s.stub.serveFooBarFileCollections()
542 s.stub.serveKeepServices(stubServices)
543 s.stub.serveKeepstoreMounts()
544 s.stub.serveKeepstoreIndexFoo4Bar1()
545 trashReqs := s.stub.serveKeepstoreTrash()
546 pullReqs := s.stub.serveKeepstorePull()
547 srv := s.newServer(&opts)
548 bal, err := srv.runOnce(context.Background())
549 c.Check(err, check.IsNil)
550 for _, req := range collReqs.reqs {
551 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
552 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
554 c.Check(trashReqs.Count(), check.Equals, 0)
555 c.Check(pullReqs.Count(), check.Equals, 0)
556 c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
557 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
561 func (s *runSuite) TestCommit(c *check.C) {
562 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
563 s.config.ManagementToken = "xyzzy"
567 Logger: ctxlog.TestLogger(c),
568 Dumper: ctxlog.TestLogger(c),
570 s.stub.serveCurrentUserAdmin()
571 s.stub.serveFooBarFileCollections()
572 s.stub.serveKeepServices(stubServices)
573 s.stub.serveKeepstoreMounts()
574 s.stub.serveKeepstoreIndexFoo4Bar1()
575 trashReqs := s.stub.serveKeepstoreTrash()
576 pullReqs := s.stub.serveKeepstorePull()
577 srv := s.newServer(&opts)
578 bal, err := srv.runOnce(context.Background())
579 c.Check(err, check.IsNil)
580 c.Check(trashReqs.Count(), check.Equals, 8)
581 c.Check(pullReqs.Count(), check.Equals, 4)
582 // "foo" block is overreplicated by 2
583 c.Check(bal.stats.trashes, check.Equals, 2)
584 // "bar" block is underreplicated by 1, and its only copy is
585 // in a poor rendezvous position
586 c.Check(bal.stats.pulls, check.Equals, 2)
588 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
589 c.Assert(err, check.IsNil)
590 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
592 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
593 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
594 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
595 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
596 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
597 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
600 func (s *runSuite) TestChunkPrefix(c *check.C) {
601 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
605 ChunkPrefix: "ac", // catch "foo" but not "bar"
606 Logger: ctxlog.TestLogger(c),
607 Dumper: ctxlog.TestLogger(c),
609 s.stub.serveCurrentUserAdmin()
610 s.stub.serveFooBarFileCollections()
611 s.stub.serveKeepServices(stubServices)
612 s.stub.serveKeepstoreMounts()
613 s.stub.serveKeepstoreIndexFoo4Bar1()
614 trashReqs := s.stub.serveKeepstoreTrash()
615 pullReqs := s.stub.serveKeepstorePull()
616 srv := s.newServer(&opts)
617 bal, err := srv.runOnce(context.Background())
618 c.Check(err, check.IsNil)
619 c.Check(trashReqs.Count(), check.Equals, 8)
620 c.Check(pullReqs.Count(), check.Equals, 4)
621 // "foo" block is overreplicated by 2
622 c.Check(bal.stats.trashes, check.Equals, 2)
623 // "bar" block is underreplicated but does not match prefix
624 c.Check(bal.stats.pulls, check.Equals, 0)
626 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
627 c.Assert(err, check.IsNil)
628 c.Check(string(lost), check.Equals, "")
631 func (s *runSuite) TestRunForever(c *check.C) {
632 s.config.ManagementToken = "xyzzy"
636 Logger: ctxlog.TestLogger(c),
637 Dumper: ctxlog.TestLogger(c),
639 s.stub.serveCurrentUserAdmin()
640 s.stub.serveFooBarFileCollections()
641 s.stub.serveKeepServices(stubServices)
642 s.stub.serveKeepstoreMounts()
643 s.stub.serveKeepstoreIndexFoo4Bar1()
644 trashReqs := s.stub.serveKeepstoreTrash()
645 pullReqs := s.stub.serveKeepstorePull()
647 ctx, cancel := context.WithCancel(context.Background())
649 s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
650 srv := s.newServer(&opts)
652 done := make(chan bool)
658 // Each run should send 4 pull lists + 4 trash lists. The
659 // first run should also send 4 empty trash lists at
660 // startup. We should complete all four runs in much less than
662 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
663 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
666 time.Sleep(time.Millisecond)
670 c.Check(pullReqs.Count() >= 16, check.Equals, true)
671 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
673 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
674 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)