1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/config"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/arvadostest"
23 "git.arvados.org/arvados.git/sdk/go/ctxlog"
24 "github.com/jmoiron/sqlx"
25 "github.com/prometheus/client_golang/prometheus"
26 check "gopkg.in/check.v1"
29 var _ = check.Suite(&runSuite{})
31 type reqTracker struct {
36 func (rt *reqTracker) Count() int {
42 func (rt *reqTracker) Add(req *http.Request) int {
45 rt.reqs = append(rt.reqs, *req)
49 var stubServices = []arvados.KeepService{
51 UUID: "zzzzz-bi6l4-000000000000000",
52 ServiceHost: "keep0.zzzzz.arvadosapi.com",
54 ServiceSSLFlag: false,
58 UUID: "zzzzz-bi6l4-000000000000001",
59 ServiceHost: "keep1.zzzzz.arvadosapi.com",
61 ServiceSSLFlag: false,
65 UUID: "zzzzz-bi6l4-000000000000002",
66 ServiceHost: "keep2.zzzzz.arvadosapi.com",
68 ServiceSSLFlag: false,
72 UUID: "zzzzz-bi6l4-000000000000003",
73 ServiceHost: "keep3.zzzzz.arvadosapi.com",
75 ServiceSSLFlag: false,
79 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
80 ServiceHost: "keep.zzzzz.arvadosapi.com",
87 var stubMounts = map[string][]arvados.KeepMount{
88 "keep0.zzzzz.arvadosapi.com:25107": {{
89 UUID: "zzzzz-ivpuk-000000000000000",
90 DeviceID: "keep0-vol0",
91 StorageClasses: map[string]bool{"default": true},
95 "keep1.zzzzz.arvadosapi.com:25107": {{
96 UUID: "zzzzz-ivpuk-100000000000000",
97 DeviceID: "keep1-vol0",
98 StorageClasses: map[string]bool{"default": true},
102 "keep2.zzzzz.arvadosapi.com:25107": {{
103 UUID: "zzzzz-ivpuk-200000000000000",
104 DeviceID: "keep2-vol0",
105 StorageClasses: map[string]bool{"default": true},
109 "keep3.zzzzz.arvadosapi.com:25107": {{
110 UUID: "zzzzz-ivpuk-300000000000000",
111 DeviceID: "keep3-vol0",
112 StorageClasses: map[string]bool{"default": true},
118 // stubServer is an HTTP transport that intercepts and processes all
119 // requests using its own handlers.
120 type stubServer struct {
125 logf func(string, ...interface{})
128 // Start initializes the stub server and returns an *http.Client that
129 // uses the stub server to handle all requests.
131 // A stubServer that has been started should eventually be shut down
133 func (s *stubServer) Start() *http.Client {
134 // Set up a config.Client that forwards all requests to s.mux
135 // via s.srv. Test cases will attach handlers to s.mux to get
136 // the desired responses.
137 s.mux = http.NewServeMux()
138 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
142 w.Header().Set("Content-Type", "application/json")
143 s.mux.ServeHTTP(w, r)
145 return &http.Client{Transport: s}
148 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
149 w := httptest.NewRecorder()
150 s.mux.ServeHTTP(w, req)
151 return &http.Response{
153 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
155 Body: ioutil.NopCloser(w.Body)}, nil
158 // Close releases resources used by the server.
159 func (s *stubServer) Close() {
163 func (s *stubServer) serveStatic(path, data string) *reqTracker {
165 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
168 ioutil.ReadAll(r.Body)
171 io.WriteString(w, data)
176 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
177 return s.serveStatic("/arvados/v1/users/current",
178 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
181 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
182 return s.serveStatic("/arvados/v1/users/current",
183 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
186 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
187 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
188 `{"defaultCollectionReplication":2}`)
191 func (s *stubServer) serveZeroCollections() *reqTracker {
192 return s.serveStatic("/arvados/v1/collections",
193 `{"items":[],"items_available":0}`)
196 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
198 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
201 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
202 io.WriteString(w, `{"items_available":0,"items":[]}`)
204 io.WriteString(w, `{"items_available":3,"items":[
205 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
206 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
213 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
215 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
218 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
219 io.WriteString(w, `{"items_available":3,"items":[]}`)
220 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
221 io.WriteString(w, `{"items_available":0,"items":[]}`)
222 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
223 io.WriteString(w, `{"items_available":0,"items":[]}`)
224 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
225 io.WriteString(w, `{"items_available":0,"items":[]}`)
227 io.WriteString(w, `{"items_available":2,"items":[
228 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
229 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
235 func (s *stubServer) serveZeroKeepServices() *reqTracker {
236 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
239 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
240 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
241 ItemsAvailable: len(svcs),
246 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
248 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
250 json.NewEncoder(w).Encode(resp)
255 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
257 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
259 json.NewEncoder(w).Encode(stubMounts[r.Host])
264 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
265 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
266 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
268 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
270 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
271 io.WriteString(w, barLine)
273 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
274 io.WriteString(w, fooLine(count))
276 io.WriteString(w, "\n")
278 for _, mounts := range stubMounts {
279 for i, mnt := range mounts {
281 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
284 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
285 io.WriteString(w, barLine)
287 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
288 io.WriteString(w, fooLine(count))
290 io.WriteString(w, "\n")
297 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
298 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
300 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
302 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
303 io.WriteString(w, fooLine)
305 io.WriteString(w, "\n")
307 for _, mounts := range stubMounts {
308 for i, mnt := range mounts {
310 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
312 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
313 io.WriteString(w, fooLine)
315 io.WriteString(w, "\n")
322 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
323 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
325 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
327 io.WriteString(w, fooLine)
328 io.WriteString(w, "\n")
330 for _, mounts := range stubMounts {
331 for _, mnt := range mounts {
332 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
334 io.WriteString(w, fooLine)
335 io.WriteString(w, "\n")
342 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
343 return s.serveStatic("/trash", `{}`)
346 func (s *stubServer) serveKeepstorePull() *reqTracker {
347 return s.serveStatic("/pull", `{}`)
350 type runSuite struct {
352 config *arvados.Cluster
354 client *arvados.Client
357 func (s *runSuite) newServer(options *RunOptions) *Server {
361 RunOptions: *options,
362 Metrics: newMetrics(prometheus.NewRegistry()),
363 Logger: options.Logger,
364 Dumper: options.Dumper,
370 func (s *runSuite) SetUpTest(c *check.C) {
371 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
372 c.Assert(err, check.Equals, nil)
373 s.config, err = cfg.GetCluster("")
374 c.Assert(err, check.Equals, nil)
375 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
376 c.Assert(err, check.IsNil)
378 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
379 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
381 s.client = &arvados.Client{
383 APIHost: "zzzzz.arvadosapi.com",
384 Client: s.stub.Start()}
386 s.stub.serveDiscoveryDoc()
390 func (s *runSuite) TearDownTest(c *check.C) {
394 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
395 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
396 _, err := s.db.Exec(`delete from collections`)
397 c.Assert(err, check.IsNil)
399 Logger: ctxlog.TestLogger(c),
401 s.stub.serveCurrentUserAdmin()
402 s.stub.serveZeroCollections()
403 s.stub.serveKeepServices(stubServices)
404 s.stub.serveKeepstoreMounts()
405 s.stub.serveKeepstoreIndexFoo4Bar1()
406 trashReqs := s.stub.serveKeepstoreTrash()
407 pullReqs := s.stub.serveKeepstorePull()
408 srv := s.newServer(&opts)
409 _, err = srv.runOnce(context.Background())
410 c.Check(err, check.ErrorMatches, "received zero collections")
411 c.Check(trashReqs.Count(), check.Equals, 4)
412 c.Check(pullReqs.Count(), check.Equals, 0)
415 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
418 Logger: ctxlog.TestLogger(c),
420 s.stub.serveCurrentUserAdmin()
421 s.stub.serveFooBarFileCollections()
422 s.stub.serveKeepServices(stubServices)
423 s.stub.serveKeepstoreMounts()
424 s.stub.serveKeepstoreIndexIgnoringPrefix()
425 trashReqs := s.stub.serveKeepstoreTrash()
426 pullReqs := s.stub.serveKeepstorePull()
427 srv := s.newServer(&opts)
428 bal, err := srv.runOnce(context.Background())
429 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
430 c.Check(trashReqs.Count(), check.Equals, 4)
431 c.Check(pullReqs.Count(), check.Equals, 0)
432 c.Check(bal.stats.trashes, check.Equals, 0)
433 c.Check(bal.stats.pulls, check.Equals, 0)
436 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
438 Logger: ctxlog.TestLogger(c),
440 s.stub.serveCurrentUserNotAdmin()
441 s.stub.serveZeroCollections()
442 s.stub.serveKeepServices(stubServices)
443 s.stub.serveKeepstoreMounts()
444 trashReqs := s.stub.serveKeepstoreTrash()
445 pullReqs := s.stub.serveKeepstorePull()
446 srv := s.newServer(&opts)
447 _, err := srv.runOnce(context.Background())
448 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
449 c.Check(trashReqs.Count(), check.Equals, 0)
450 c.Check(pullReqs.Count(), check.Equals, 0)
453 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
454 for _, trial := range []struct {
458 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
459 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
460 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
463 c.Logf("trying invalid prefix %q", trial.prefix)
465 ChunkPrefix: trial.prefix,
466 Logger: ctxlog.TestLogger(c),
468 s.stub.serveCurrentUserAdmin()
469 s.stub.serveFooBarFileCollections()
470 s.stub.serveKeepServices(stubServices)
471 s.stub.serveKeepstoreMounts()
472 trashReqs := s.stub.serveKeepstoreTrash()
473 pullReqs := s.stub.serveKeepstorePull()
474 srv := s.newServer(&opts)
475 _, err := srv.runOnce(context.Background())
476 c.Check(err, check.ErrorMatches, trial.errRe)
477 c.Check(trashReqs.Count(), check.Equals, 0)
478 c.Check(pullReqs.Count(), check.Equals, 0)
482 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
484 Logger: ctxlog.TestLogger(c),
486 s.stub.serveCurrentUserAdmin()
487 s.stub.serveZeroCollections()
488 s.stub.serveKeepServices(stubServices)
489 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
490 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
491 json.NewEncoder(w).Encode([]arvados.KeepMount{{
492 UUID: "zzzzz-ivpuk-0000000000" + hostid,
493 DeviceID: "keep0-vol0",
494 StorageClasses: map[string]bool{"default": true},
497 trashReqs := s.stub.serveKeepstoreTrash()
498 pullReqs := s.stub.serveKeepstorePull()
499 srv := s.newServer(&opts)
500 _, err := srv.runOnce(context.Background())
501 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
502 c.Check(trashReqs.Count(), check.Equals, 0)
503 c.Check(pullReqs.Count(), check.Equals, 0)
506 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
507 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
508 c.Assert(err, check.IsNil)
509 s.config.Collections.BlobMissingReport = lostf.Name()
510 defer os.Remove(lostf.Name())
512 Logger: ctxlog.TestLogger(c),
514 s.stub.serveCurrentUserAdmin()
515 s.stub.serveFooBarFileCollections()
516 s.stub.serveKeepServices(stubServices)
517 s.stub.serveKeepstoreMounts()
518 s.stub.serveKeepstoreIndexFoo1()
519 s.stub.serveKeepstoreTrash()
520 s.stub.serveKeepstorePull()
521 srv := s.newServer(&opts)
522 c.Assert(err, check.IsNil)
523 _, err = srv.runOnce(context.Background())
524 c.Check(err, check.IsNil)
525 lost, err := ioutil.ReadFile(lostf.Name())
526 c.Assert(err, check.IsNil)
527 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
530 func (s *runSuite) TestDryRun(c *check.C) {
531 s.config.Collections.BalanceTrashLimit = 0
532 s.config.Collections.BalancePullLimit = 0
534 Logger: ctxlog.TestLogger(c),
536 s.stub.serveCurrentUserAdmin()
537 collReqs := s.stub.serveFooBarFileCollections()
538 s.stub.serveKeepServices(stubServices)
539 s.stub.serveKeepstoreMounts()
540 s.stub.serveKeepstoreIndexFoo4Bar1()
541 trashReqs := s.stub.serveKeepstoreTrash()
542 pullReqs := s.stub.serveKeepstorePull()
543 srv := s.newServer(&opts)
544 bal, err := srv.runOnce(context.Background())
545 c.Check(err, check.IsNil)
546 for _, req := range collReqs.reqs {
547 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
548 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
550 c.Check(trashReqs.Count(), check.Equals, 0)
551 c.Check(pullReqs.Count(), check.Equals, 0)
552 c.Check(bal.stats.pulls, check.Equals, 0)
553 c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
554 c.Check(bal.stats.trashes, check.Equals, 0)
555 c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
556 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
557 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
560 func (s *runSuite) TestCommit(c *check.C) {
561 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
562 s.config.ManagementToken = "xyzzy"
564 Logger: ctxlog.TestLogger(c),
565 Dumper: ctxlog.TestLogger(c),
567 s.stub.serveCurrentUserAdmin()
568 s.stub.serveFooBarFileCollections()
569 s.stub.serveKeepServices(stubServices)
570 s.stub.serveKeepstoreMounts()
571 s.stub.serveKeepstoreIndexFoo4Bar1()
572 trashReqs := s.stub.serveKeepstoreTrash()
573 pullReqs := s.stub.serveKeepstorePull()
574 srv := s.newServer(&opts)
575 bal, err := srv.runOnce(context.Background())
576 c.Check(err, check.IsNil)
577 c.Check(trashReqs.Count(), check.Equals, 8)
578 c.Check(pullReqs.Count(), check.Equals, 4)
579 // "foo" block is overreplicated by 2
580 c.Check(bal.stats.trashes, check.Equals, 2)
581 // "bar" block is underreplicated by 1, and its only copy is
582 // in a poor rendezvous position
583 c.Check(bal.stats.pulls, check.Equals, 2)
585 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
586 c.Assert(err, check.IsNil)
587 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
589 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
590 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
591 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
592 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
593 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
594 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
597 func (s *runSuite) TestChunkPrefix(c *check.C) {
598 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
600 ChunkPrefix: "ac", // catch "foo" but not "bar"
601 Logger: ctxlog.TestLogger(c),
602 Dumper: ctxlog.TestLogger(c),
604 s.stub.serveCurrentUserAdmin()
605 s.stub.serveFooBarFileCollections()
606 s.stub.serveKeepServices(stubServices)
607 s.stub.serveKeepstoreMounts()
608 s.stub.serveKeepstoreIndexFoo4Bar1()
609 trashReqs := s.stub.serveKeepstoreTrash()
610 pullReqs := s.stub.serveKeepstorePull()
611 srv := s.newServer(&opts)
612 bal, err := srv.runOnce(context.Background())
613 c.Check(err, check.IsNil)
614 c.Check(trashReqs.Count(), check.Equals, 8)
615 c.Check(pullReqs.Count(), check.Equals, 4)
616 // "foo" block is overreplicated by 2
617 c.Check(bal.stats.trashes, check.Equals, 2)
618 // "bar" block is underreplicated but does not match prefix
619 c.Check(bal.stats.pulls, check.Equals, 0)
621 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
622 c.Assert(err, check.IsNil)
623 c.Check(string(lost), check.Equals, "")
626 func (s *runSuite) TestRunForever(c *check.C) {
627 s.config.ManagementToken = "xyzzy"
629 Logger: ctxlog.TestLogger(c),
630 Dumper: ctxlog.TestLogger(c),
632 s.stub.serveCurrentUserAdmin()
633 s.stub.serveFooBarFileCollections()
634 s.stub.serveKeepServices(stubServices)
635 s.stub.serveKeepstoreMounts()
636 s.stub.serveKeepstoreIndexFoo4Bar1()
637 trashReqs := s.stub.serveKeepstoreTrash()
638 pullReqs := s.stub.serveKeepstorePull()
640 ctx, cancel := context.WithCancel(context.Background())
642 s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
643 srv := s.newServer(&opts)
645 done := make(chan bool)
651 // Each run should send 4 pull lists + 4 trash lists. The
652 // first run should also send 4 empty trash lists at
653 // startup. We should complete all four runs in much less than
655 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
656 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
659 time.Sleep(time.Millisecond)
663 c.Check(pullReqs.Count() >= 16, check.Equals, true)
664 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
666 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
667 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)