1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/config"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/arvadostest"
23 "git.arvados.org/arvados.git/sdk/go/ctxlog"
24 "github.com/jmoiron/sqlx"
25 "github.com/prometheus/client_golang/prometheus"
26 check "gopkg.in/check.v1"
29 var _ = check.Suite(&runSuite{})
31 type reqTracker struct {
36 func (rt *reqTracker) Count() int {
42 func (rt *reqTracker) Add(req *http.Request) int {
45 rt.reqs = append(rt.reqs, *req)
49 var stubServices = []arvados.KeepService{
51 UUID: "zzzzz-bi6l4-000000000000000",
52 ServiceHost: "keep0.zzzzz.arvadosapi.com",
54 ServiceSSLFlag: false,
58 UUID: "zzzzz-bi6l4-000000000000001",
59 ServiceHost: "keep1.zzzzz.arvadosapi.com",
61 ServiceSSLFlag: false,
65 UUID: "zzzzz-bi6l4-000000000000002",
66 ServiceHost: "keep2.zzzzz.arvadosapi.com",
68 ServiceSSLFlag: false,
72 UUID: "zzzzz-bi6l4-000000000000003",
73 ServiceHost: "keep3.zzzzz.arvadosapi.com",
75 ServiceSSLFlag: false,
79 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
80 ServiceHost: "keep.zzzzz.arvadosapi.com",
87 var stubMounts = map[string][]arvados.KeepMount{
88 "keep0.zzzzz.arvadosapi.com:25107": {{
89 UUID: "zzzzz-ivpuk-000000000000000",
90 DeviceID: "keep0-vol0",
91 StorageClasses: map[string]bool{"default": true},
95 "keep1.zzzzz.arvadosapi.com:25107": {{
96 UUID: "zzzzz-ivpuk-100000000000000",
97 DeviceID: "keep1-vol0",
98 StorageClasses: map[string]bool{"default": true},
102 "keep2.zzzzz.arvadosapi.com:25107": {{
103 UUID: "zzzzz-ivpuk-200000000000000",
104 DeviceID: "keep2-vol0",
105 StorageClasses: map[string]bool{"default": true},
109 "keep3.zzzzz.arvadosapi.com:25107": {{
110 UUID: "zzzzz-ivpuk-300000000000000",
111 DeviceID: "keep3-vol0",
112 StorageClasses: map[string]bool{"default": true},
118 // stubServer is an HTTP transport that intercepts and processes all
119 // requests using its own handlers.
120 type stubServer struct {
125 logf func(string, ...interface{})
128 // Start initializes the stub server and returns an *http.Client that
129 // uses the stub server to handle all requests.
131 // A stubServer that has been started should eventually be shut down
133 func (s *stubServer) Start() *http.Client {
134 // Set up a config.Client that forwards all requests to s.mux
135 // via s.srv. Test cases will attach handlers to s.mux to get
136 // the desired responses.
137 s.mux = http.NewServeMux()
138 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
142 w.Header().Set("Content-Type", "application/json")
143 s.mux.ServeHTTP(w, r)
145 return &http.Client{Transport: s}
148 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
149 w := httptest.NewRecorder()
150 s.mux.ServeHTTP(w, req)
151 return &http.Response{
153 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
155 Body: ioutil.NopCloser(w.Body)}, nil
158 // Close releases resources used by the server.
159 func (s *stubServer) Close() {
163 func (s *stubServer) serveStatic(path, data string) *reqTracker {
165 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
168 ioutil.ReadAll(r.Body)
171 io.WriteString(w, data)
176 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
177 return s.serveStatic("/arvados/v1/users/current",
178 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
181 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
182 return s.serveStatic("/arvados/v1/users/current",
183 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
186 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
187 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
188 `{"defaultCollectionReplication":2}`)
191 func (s *stubServer) serveZeroCollections() *reqTracker {
192 return s.serveStatic("/arvados/v1/collections",
193 `{"items":[],"items_available":0}`)
196 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
198 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
201 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
202 io.WriteString(w, `{"items_available":0,"items":[]}`)
204 io.WriteString(w, `{"items_available":3,"items":[
205 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
206 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
213 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
215 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
218 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
219 io.WriteString(w, `{"items_available":3,"items":[]}`)
220 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
221 io.WriteString(w, `{"items_available":0,"items":[]}`)
222 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
223 io.WriteString(w, `{"items_available":0,"items":[]}`)
224 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
225 io.WriteString(w, `{"items_available":0,"items":[]}`)
227 io.WriteString(w, `{"items_available":2,"items":[
228 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
229 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
235 func (s *stubServer) serveZeroKeepServices() *reqTracker {
236 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
239 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
240 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
241 ItemsAvailable: len(svcs),
246 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
248 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
250 json.NewEncoder(w).Encode(resp)
255 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
257 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
259 json.NewEncoder(w).Encode(stubMounts[r.Host])
264 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
265 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
266 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
268 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
270 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
271 io.WriteString(w, barLine)
273 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
274 io.WriteString(w, fooLine(count))
276 io.WriteString(w, "\n")
278 for _, mounts := range stubMounts {
279 for i, mnt := range mounts {
281 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
284 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
285 io.WriteString(w, barLine)
287 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
288 io.WriteString(w, fooLine(count))
290 io.WriteString(w, "\n")
297 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
298 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
300 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
302 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
303 io.WriteString(w, fooLine)
305 io.WriteString(w, "\n")
307 for _, mounts := range stubMounts {
308 for i, mnt := range mounts {
310 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
312 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
313 io.WriteString(w, fooLine)
315 io.WriteString(w, "\n")
322 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
323 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
325 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
327 io.WriteString(w, fooLine)
328 io.WriteString(w, "\n")
330 for _, mounts := range stubMounts {
331 for _, mnt := range mounts {
332 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
334 io.WriteString(w, fooLine)
335 io.WriteString(w, "\n")
342 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
343 return s.serveStatic("/trash", `{}`)
346 func (s *stubServer) serveKeepstorePull() *reqTracker {
347 return s.serveStatic("/pull", `{}`)
350 type runSuite struct {
352 config *arvados.Cluster
354 client *arvados.Client
357 func (s *runSuite) newServer(options *RunOptions) *Server {
361 RunOptions: *options,
362 Metrics: newMetrics(prometheus.NewRegistry()),
363 Logger: options.Logger,
364 Dumper: options.Dumper,
370 func (s *runSuite) SetUpTest(c *check.C) {
371 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
372 c.Assert(err, check.Equals, nil)
373 s.config, err = cfg.GetCluster("")
374 c.Assert(err, check.Equals, nil)
375 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
376 c.Assert(err, check.IsNil)
378 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
379 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
381 s.client = &arvados.Client{
383 APIHost: "zzzzz.arvadosapi.com",
384 Client: s.stub.Start()}
386 s.stub.serveDiscoveryDoc()
390 func (s *runSuite) TearDownTest(c *check.C) {
394 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
395 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
396 _, err := s.db.Exec(`delete from collections`)
397 c.Assert(err, check.IsNil)
401 Logger: ctxlog.TestLogger(c),
403 s.stub.serveCurrentUserAdmin()
404 s.stub.serveZeroCollections()
405 s.stub.serveKeepServices(stubServices)
406 s.stub.serveKeepstoreMounts()
407 s.stub.serveKeepstoreIndexFoo4Bar1()
408 trashReqs := s.stub.serveKeepstoreTrash()
409 pullReqs := s.stub.serveKeepstorePull()
410 srv := s.newServer(&opts)
411 _, err = srv.runOnce(context.Background())
412 c.Check(err, check.ErrorMatches, "received zero collections")
413 c.Check(trashReqs.Count(), check.Equals, 4)
414 c.Check(pullReqs.Count(), check.Equals, 0)
417 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
422 Logger: ctxlog.TestLogger(c),
424 s.stub.serveCurrentUserAdmin()
425 s.stub.serveFooBarFileCollections()
426 s.stub.serveKeepServices(stubServices)
427 s.stub.serveKeepstoreMounts()
428 s.stub.serveKeepstoreIndexIgnoringPrefix()
429 trashReqs := s.stub.serveKeepstoreTrash()
430 pullReqs := s.stub.serveKeepstorePull()
431 srv := s.newServer(&opts)
432 bal, err := srv.runOnce(context.Background())
433 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
434 c.Check(trashReqs.Count(), check.Equals, 4)
435 c.Check(pullReqs.Count(), check.Equals, 0)
436 c.Check(bal.stats.trashes, check.Equals, 0)
437 c.Check(bal.stats.pulls, check.Equals, 0)
440 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
444 Logger: ctxlog.TestLogger(c),
446 s.stub.serveCurrentUserNotAdmin()
447 s.stub.serveZeroCollections()
448 s.stub.serveKeepServices(stubServices)
449 s.stub.serveKeepstoreMounts()
450 trashReqs := s.stub.serveKeepstoreTrash()
451 pullReqs := s.stub.serveKeepstorePull()
452 srv := s.newServer(&opts)
453 _, err := srv.runOnce(context.Background())
454 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
455 c.Check(trashReqs.Count(), check.Equals, 0)
456 c.Check(pullReqs.Count(), check.Equals, 0)
459 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
460 for _, trial := range []struct {
464 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
465 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
466 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
469 c.Logf("trying invalid prefix %q", trial.prefix)
473 ChunkPrefix: trial.prefix,
474 Logger: ctxlog.TestLogger(c),
476 s.stub.serveCurrentUserAdmin()
477 s.stub.serveFooBarFileCollections()
478 s.stub.serveKeepServices(stubServices)
479 s.stub.serveKeepstoreMounts()
480 trashReqs := s.stub.serveKeepstoreTrash()
481 pullReqs := s.stub.serveKeepstorePull()
482 srv := s.newServer(&opts)
483 _, err := srv.runOnce(context.Background())
484 c.Check(err, check.ErrorMatches, trial.errRe)
485 c.Check(trashReqs.Count(), check.Equals, 0)
486 c.Check(pullReqs.Count(), check.Equals, 0)
490 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
494 Logger: ctxlog.TestLogger(c),
496 s.stub.serveCurrentUserAdmin()
497 s.stub.serveZeroCollections()
498 s.stub.serveKeepServices(stubServices)
499 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
500 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
501 json.NewEncoder(w).Encode([]arvados.KeepMount{{
502 UUID: "zzzzz-ivpuk-0000000000" + hostid,
503 DeviceID: "keep0-vol0",
504 StorageClasses: map[string]bool{"default": true},
507 trashReqs := s.stub.serveKeepstoreTrash()
508 pullReqs := s.stub.serveKeepstorePull()
509 srv := s.newServer(&opts)
510 _, err := srv.runOnce(context.Background())
511 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
512 c.Check(trashReqs.Count(), check.Equals, 0)
513 c.Check(pullReqs.Count(), check.Equals, 0)
516 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
517 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
518 c.Assert(err, check.IsNil)
519 s.config.Collections.BlobMissingReport = lostf.Name()
520 defer os.Remove(lostf.Name())
524 Logger: ctxlog.TestLogger(c),
526 s.stub.serveCurrentUserAdmin()
527 s.stub.serveFooBarFileCollections()
528 s.stub.serveKeepServices(stubServices)
529 s.stub.serveKeepstoreMounts()
530 s.stub.serveKeepstoreIndexFoo1()
531 s.stub.serveKeepstoreTrash()
532 s.stub.serveKeepstorePull()
533 srv := s.newServer(&opts)
534 c.Assert(err, check.IsNil)
535 _, err = srv.runOnce(context.Background())
536 c.Check(err, check.IsNil)
537 lost, err := ioutil.ReadFile(lostf.Name())
538 c.Assert(err, check.IsNil)
539 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
542 func (s *runSuite) TestDryRun(c *check.C) {
546 Logger: ctxlog.TestLogger(c),
548 s.stub.serveCurrentUserAdmin()
549 collReqs := s.stub.serveFooBarFileCollections()
550 s.stub.serveKeepServices(stubServices)
551 s.stub.serveKeepstoreMounts()
552 s.stub.serveKeepstoreIndexFoo4Bar1()
553 trashReqs := s.stub.serveKeepstoreTrash()
554 pullReqs := s.stub.serveKeepstorePull()
555 srv := s.newServer(&opts)
556 bal, err := srv.runOnce(context.Background())
557 c.Check(err, check.IsNil)
558 for _, req := range collReqs.reqs {
559 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
560 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
562 c.Check(trashReqs.Count(), check.Equals, 0)
563 c.Check(pullReqs.Count(), check.Equals, 0)
564 c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
565 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
566 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
569 func (s *runSuite) TestCommit(c *check.C) {
570 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
571 s.config.ManagementToken = "xyzzy"
575 Logger: ctxlog.TestLogger(c),
576 Dumper: ctxlog.TestLogger(c),
578 s.stub.serveCurrentUserAdmin()
579 s.stub.serveFooBarFileCollections()
580 s.stub.serveKeepServices(stubServices)
581 s.stub.serveKeepstoreMounts()
582 s.stub.serveKeepstoreIndexFoo4Bar1()
583 trashReqs := s.stub.serveKeepstoreTrash()
584 pullReqs := s.stub.serveKeepstorePull()
585 srv := s.newServer(&opts)
586 bal, err := srv.runOnce(context.Background())
587 c.Check(err, check.IsNil)
588 c.Check(trashReqs.Count(), check.Equals, 8)
589 c.Check(pullReqs.Count(), check.Equals, 4)
590 // "foo" block is overreplicated by 2
591 c.Check(bal.stats.trashes, check.Equals, 2)
592 // "bar" block is underreplicated by 1, and its only copy is
593 // in a poor rendezvous position
594 c.Check(bal.stats.pulls, check.Equals, 2)
596 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
597 c.Assert(err, check.IsNil)
598 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
600 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
601 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
602 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
603 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
604 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
605 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
608 func (s *runSuite) TestChunkPrefix(c *check.C) {
609 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
613 ChunkPrefix: "ac", // catch "foo" but not "bar"
614 Logger: ctxlog.TestLogger(c),
615 Dumper: ctxlog.TestLogger(c),
617 s.stub.serveCurrentUserAdmin()
618 s.stub.serveFooBarFileCollections()
619 s.stub.serveKeepServices(stubServices)
620 s.stub.serveKeepstoreMounts()
621 s.stub.serveKeepstoreIndexFoo4Bar1()
622 trashReqs := s.stub.serveKeepstoreTrash()
623 pullReqs := s.stub.serveKeepstorePull()
624 srv := s.newServer(&opts)
625 bal, err := srv.runOnce(context.Background())
626 c.Check(err, check.IsNil)
627 c.Check(trashReqs.Count(), check.Equals, 8)
628 c.Check(pullReqs.Count(), check.Equals, 4)
629 // "foo" block is overreplicated by 2
630 c.Check(bal.stats.trashes, check.Equals, 2)
631 // "bar" block is underreplicated but does not match prefix
632 c.Check(bal.stats.pulls, check.Equals, 0)
634 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
635 c.Assert(err, check.IsNil)
636 c.Check(string(lost), check.Equals, "")
639 func (s *runSuite) TestRunForever(c *check.C) {
640 s.config.ManagementToken = "xyzzy"
644 Logger: ctxlog.TestLogger(c),
645 Dumper: ctxlog.TestLogger(c),
647 s.stub.serveCurrentUserAdmin()
648 s.stub.serveFooBarFileCollections()
649 s.stub.serveKeepServices(stubServices)
650 s.stub.serveKeepstoreMounts()
651 s.stub.serveKeepstoreIndexFoo4Bar1()
652 trashReqs := s.stub.serveKeepstoreTrash()
653 pullReqs := s.stub.serveKeepstorePull()
655 ctx, cancel := context.WithCancel(context.Background())
657 s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
658 srv := s.newServer(&opts)
660 done := make(chan bool)
666 // Each run should send 4 pull lists + 4 trash lists. The
667 // first run should also send 4 empty trash lists at
668 // startup. We should complete all four runs in much less than
670 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
671 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
674 time.Sleep(time.Millisecond)
678 c.Check(pullReqs.Count() >= 16, check.Equals, true)
679 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
681 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
682 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)