1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/config"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/jmoiron/sqlx"
26 "github.com/prometheus/client_golang/prometheus"
27 check "gopkg.in/check.v1"
30 var _ = check.Suite(&runSuite{})
32 type reqTracker struct {
37 func (rt *reqTracker) Count() int {
43 func (rt *reqTracker) Add(req *http.Request) int {
46 rt.reqs = append(rt.reqs, *req)
50 var stubServices = []arvados.KeepService{
52 UUID: "zzzzz-bi6l4-000000000000000",
53 ServiceHost: "keep0.zzzzz.arvadosapi.com",
55 ServiceSSLFlag: false,
59 UUID: "zzzzz-bi6l4-000000000000001",
60 ServiceHost: "keep1.zzzzz.arvadosapi.com",
62 ServiceSSLFlag: false,
66 UUID: "zzzzz-bi6l4-000000000000002",
67 ServiceHost: "keep2.zzzzz.arvadosapi.com",
69 ServiceSSLFlag: false,
73 UUID: "zzzzz-bi6l4-000000000000003",
74 ServiceHost: "keep3.zzzzz.arvadosapi.com",
76 ServiceSSLFlag: false,
80 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
81 ServiceHost: "keep.zzzzz.arvadosapi.com",
88 var stubMounts = map[string][]arvados.KeepMount{
89 "keep0.zzzzz.arvadosapi.com:25107": {{
90 UUID: "zzzzz-ivpuk-000000000000000",
91 DeviceID: "keep0-vol0",
92 StorageClasses: map[string]bool{"default": true},
96 "keep1.zzzzz.arvadosapi.com:25107": {{
97 UUID: "zzzzz-ivpuk-100000000000000",
98 DeviceID: "keep1-vol0",
99 StorageClasses: map[string]bool{"default": true},
103 "keep2.zzzzz.arvadosapi.com:25107": {{
104 UUID: "zzzzz-ivpuk-200000000000000",
105 DeviceID: "keep2-vol0",
106 StorageClasses: map[string]bool{"default": true},
110 "keep3.zzzzz.arvadosapi.com:25107": {{
111 UUID: "zzzzz-ivpuk-300000000000000",
112 DeviceID: "keep3-vol0",
113 StorageClasses: map[string]bool{"default": true},
119 // stubServer is an HTTP transport that intercepts and processes all
120 // requests using its own handlers.
121 type stubServer struct {
126 logf func(string, ...interface{})
129 // Start initializes the stub server and returns an *http.Client that
130 // uses the stub server to handle all requests.
132 // A stubServer that has been started should eventually be shut down
134 func (s *stubServer) Start() *http.Client {
135 // Set up a config.Client that forwards all requests to s.mux
136 // via s.srv. Test cases will attach handlers to s.mux to get
137 // the desired responses.
138 s.mux = http.NewServeMux()
139 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
143 w.Header().Set("Content-Type", "application/json")
144 s.mux.ServeHTTP(w, r)
146 return &http.Client{Transport: s}
149 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
150 w := httptest.NewRecorder()
151 s.mux.ServeHTTP(w, req)
152 return &http.Response{
154 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
156 Body: ioutil.NopCloser(w.Body)}, nil
159 // Close releases resources used by the server.
160 func (s *stubServer) Close() {
164 func (s *stubServer) serveStatic(path, data string) *reqTracker {
166 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
169 ioutil.ReadAll(r.Body)
172 io.WriteString(w, data)
177 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
178 return s.serveStatic("/arvados/v1/users/current",
179 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
182 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
183 return s.serveStatic("/arvados/v1/users/current",
184 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
187 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
188 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
189 `{"defaultCollectionReplication":2}`)
192 func (s *stubServer) serveZeroCollections() *reqTracker {
193 return s.serveStatic("/arvados/v1/collections",
194 `{"items":[],"items_available":0}`)
197 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
199 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
202 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
203 io.WriteString(w, `{"items_available":0,"items":[]}`)
205 io.WriteString(w, `{"items_available":3,"items":[
206 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
208 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
214 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
216 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
219 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
220 io.WriteString(w, `{"items_available":3,"items":[]}`)
221 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
222 io.WriteString(w, `{"items_available":0,"items":[]}`)
223 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
224 io.WriteString(w, `{"items_available":0,"items":[]}`)
225 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
226 io.WriteString(w, `{"items_available":0,"items":[]}`)
228 io.WriteString(w, `{"items_available":2,"items":[
229 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
230 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
236 func (s *stubServer) serveZeroKeepServices() *reqTracker {
237 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
240 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
241 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
242 ItemsAvailable: len(svcs),
247 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
249 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
251 json.NewEncoder(w).Encode(resp)
256 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
258 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
260 json.NewEncoder(w).Encode(stubMounts[r.Host])
265 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
266 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
267 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
269 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
271 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
272 io.WriteString(w, barLine)
274 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
275 io.WriteString(w, fooLine(count))
277 io.WriteString(w, "\n")
279 for _, mounts := range stubMounts {
280 for i, mnt := range mounts {
282 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
285 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
286 io.WriteString(w, barLine)
288 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
289 io.WriteString(w, fooLine(count))
291 io.WriteString(w, "\n")
298 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
299 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
301 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
303 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
304 io.WriteString(w, fooLine)
306 io.WriteString(w, "\n")
308 for _, mounts := range stubMounts {
309 for i, mnt := range mounts {
311 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
313 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
314 io.WriteString(w, fooLine)
316 io.WriteString(w, "\n")
323 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
324 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
326 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
328 io.WriteString(w, fooLine)
329 io.WriteString(w, "\n")
331 for _, mounts := range stubMounts {
332 for _, mnt := range mounts {
333 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
335 io.WriteString(w, fooLine)
336 io.WriteString(w, "\n")
343 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
344 return s.serveStatic("/trash", `{}`)
347 func (s *stubServer) serveKeepstorePull() *reqTracker {
348 return s.serveStatic("/pull", `{}`)
351 type runSuite struct {
353 config *arvados.Cluster
355 client *arvados.Client
358 func (s *runSuite) newServer(options *RunOptions) *Server {
362 RunOptions: *options,
363 Metrics: newMetrics(prometheus.NewRegistry()),
364 Logger: options.Logger,
365 Dumper: options.Dumper,
371 func (s *runSuite) SetUpTest(c *check.C) {
372 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
373 c.Assert(err, check.Equals, nil)
374 s.config, err = cfg.GetCluster("")
375 c.Assert(err, check.Equals, nil)
376 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
377 c.Assert(err, check.IsNil)
379 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
380 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
382 s.client = &arvados.Client{
384 APIHost: "zzzzz.arvadosapi.com",
385 Client: s.stub.Start()}
387 s.stub.serveDiscoveryDoc()
391 func (s *runSuite) TearDownTest(c *check.C) {
395 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
396 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
397 _, err := s.db.Exec(`delete from collections`)
398 c.Assert(err, check.IsNil)
400 Logger: ctxlog.TestLogger(c),
402 s.stub.serveCurrentUserAdmin()
403 s.stub.serveZeroCollections()
404 s.stub.serveKeepServices(stubServices)
405 s.stub.serveKeepstoreMounts()
406 s.stub.serveKeepstoreIndexFoo4Bar1()
407 trashReqs := s.stub.serveKeepstoreTrash()
408 pullReqs := s.stub.serveKeepstorePull()
409 srv := s.newServer(&opts)
410 _, err = srv.runOnce(context.Background())
411 c.Check(err, check.ErrorMatches, "received zero collections")
412 c.Check(trashReqs.Count(), check.Equals, 4)
413 c.Check(pullReqs.Count(), check.Equals, 0)
416 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
419 Logger: ctxlog.TestLogger(c),
421 s.stub.serveCurrentUserAdmin()
422 s.stub.serveFooBarFileCollections()
423 s.stub.serveKeepServices(stubServices)
424 s.stub.serveKeepstoreMounts()
425 s.stub.serveKeepstoreIndexIgnoringPrefix()
426 trashReqs := s.stub.serveKeepstoreTrash()
427 pullReqs := s.stub.serveKeepstorePull()
428 srv := s.newServer(&opts)
429 bal, err := srv.runOnce(context.Background())
430 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
431 c.Check(trashReqs.Count(), check.Equals, 4)
432 c.Check(pullReqs.Count(), check.Equals, 0)
433 c.Check(bal.stats.trashes, check.Equals, 0)
434 c.Check(bal.stats.pulls, check.Equals, 0)
437 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
439 Logger: ctxlog.TestLogger(c),
441 s.stub.serveCurrentUserNotAdmin()
442 s.stub.serveZeroCollections()
443 s.stub.serveKeepServices(stubServices)
444 s.stub.serveKeepstoreMounts()
445 trashReqs := s.stub.serveKeepstoreTrash()
446 pullReqs := s.stub.serveKeepstorePull()
447 srv := s.newServer(&opts)
448 _, err := srv.runOnce(context.Background())
449 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
450 c.Check(trashReqs.Count(), check.Equals, 0)
451 c.Check(pullReqs.Count(), check.Equals, 0)
454 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
455 for _, trial := range []struct {
459 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
460 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
461 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
464 c.Logf("trying invalid prefix %q", trial.prefix)
466 ChunkPrefix: trial.prefix,
467 Logger: ctxlog.TestLogger(c),
469 s.stub.serveCurrentUserAdmin()
470 s.stub.serveFooBarFileCollections()
471 s.stub.serveKeepServices(stubServices)
472 s.stub.serveKeepstoreMounts()
473 trashReqs := s.stub.serveKeepstoreTrash()
474 pullReqs := s.stub.serveKeepstorePull()
475 srv := s.newServer(&opts)
476 _, err := srv.runOnce(context.Background())
477 c.Check(err, check.ErrorMatches, trial.errRe)
478 c.Check(trashReqs.Count(), check.Equals, 0)
479 c.Check(pullReqs.Count(), check.Equals, 0)
483 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
485 Logger: ctxlog.TestLogger(c),
487 s.stub.serveCurrentUserAdmin()
488 s.stub.serveZeroCollections()
489 s.stub.serveKeepServices(stubServices)
490 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
491 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
492 json.NewEncoder(w).Encode([]arvados.KeepMount{{
493 UUID: "zzzzz-ivpuk-0000000000" + hostid,
494 DeviceID: "keep0-vol0",
495 StorageClasses: map[string]bool{"default": true},
498 trashReqs := s.stub.serveKeepstoreTrash()
499 pullReqs := s.stub.serveKeepstorePull()
500 srv := s.newServer(&opts)
501 _, err := srv.runOnce(context.Background())
502 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
503 c.Check(trashReqs.Count(), check.Equals, 0)
504 c.Check(pullReqs.Count(), check.Equals, 0)
507 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
508 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
509 c.Assert(err, check.IsNil)
510 s.config.Collections.BlobMissingReport = lostf.Name()
511 defer os.Remove(lostf.Name())
513 Logger: ctxlog.TestLogger(c),
515 s.stub.serveCurrentUserAdmin()
516 s.stub.serveFooBarFileCollections()
517 s.stub.serveKeepServices(stubServices)
518 s.stub.serveKeepstoreMounts()
519 s.stub.serveKeepstoreIndexFoo1()
520 s.stub.serveKeepstoreTrash()
521 s.stub.serveKeepstorePull()
522 srv := s.newServer(&opts)
523 c.Assert(err, check.IsNil)
524 _, err = srv.runOnce(context.Background())
525 c.Check(err, check.IsNil)
526 lost, err := ioutil.ReadFile(lostf.Name())
527 c.Assert(err, check.IsNil)
528 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
531 func (s *runSuite) TestDryRun(c *check.C) {
532 s.config.Collections.BalanceTrashLimit = 0
533 s.config.Collections.BalancePullLimit = 0
535 Logger: ctxlog.TestLogger(c),
537 s.stub.serveCurrentUserAdmin()
538 collReqs := s.stub.serveFooBarFileCollections()
539 s.stub.serveKeepServices(stubServices)
540 s.stub.serveKeepstoreMounts()
541 s.stub.serveKeepstoreIndexFoo4Bar1()
542 trashReqs := s.stub.serveKeepstoreTrash()
543 pullReqs := s.stub.serveKeepstorePull()
544 srv := s.newServer(&opts)
545 bal, err := srv.runOnce(context.Background())
546 c.Check(err, check.IsNil)
547 for _, req := range collReqs.reqs {
548 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
549 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
551 c.Check(trashReqs.Count(), check.Equals, 0)
552 c.Check(pullReqs.Count(), check.Equals, 0)
553 c.Check(bal.stats.pulls, check.Equals, 0)
554 c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
555 c.Check(bal.stats.trashes, check.Equals, 0)
556 c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
557 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
561 func (s *runSuite) TestCommit(c *check.C) {
562 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
563 s.config.ManagementToken = "xyzzy"
565 Logger: ctxlog.TestLogger(c),
566 Dumper: ctxlog.TestLogger(c),
568 s.stub.serveCurrentUserAdmin()
569 s.stub.serveFooBarFileCollections()
570 s.stub.serveKeepServices(stubServices)
571 s.stub.serveKeepstoreMounts()
572 s.stub.serveKeepstoreIndexFoo4Bar1()
573 trashReqs := s.stub.serveKeepstoreTrash()
574 pullReqs := s.stub.serveKeepstorePull()
575 srv := s.newServer(&opts)
576 bal, err := srv.runOnce(context.Background())
577 c.Check(err, check.IsNil)
578 c.Check(trashReqs.Count(), check.Equals, 8)
579 c.Check(pullReqs.Count(), check.Equals, 4)
580 // "foo" block is overreplicated by 2
581 c.Check(bal.stats.trashes, check.Equals, 2)
582 // "bar" block is underreplicated by 1, and its only copy is
583 // in a poor rendezvous position
584 c.Check(bal.stats.pulls, check.Equals, 2)
586 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
587 c.Assert(err, check.IsNil)
588 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
590 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
591 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
592 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
593 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
594 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
595 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
598 func (s *runSuite) TestChunkPrefix(c *check.C) {
599 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
601 ChunkPrefix: "ac", // catch "foo" but not "bar"
602 Logger: ctxlog.TestLogger(c),
603 Dumper: ctxlog.TestLogger(c),
605 s.stub.serveCurrentUserAdmin()
606 s.stub.serveFooBarFileCollections()
607 s.stub.serveKeepServices(stubServices)
608 s.stub.serveKeepstoreMounts()
609 s.stub.serveKeepstoreIndexFoo4Bar1()
610 trashReqs := s.stub.serveKeepstoreTrash()
611 pullReqs := s.stub.serveKeepstorePull()
612 srv := s.newServer(&opts)
613 bal, err := srv.runOnce(context.Background())
614 c.Check(err, check.IsNil)
615 c.Check(trashReqs.Count(), check.Equals, 8)
616 c.Check(pullReqs.Count(), check.Equals, 4)
617 // "foo" block is overreplicated by 2
618 c.Check(bal.stats.trashes, check.Equals, 2)
619 // "bar" block is underreplicated but does not match prefix
620 c.Check(bal.stats.pulls, check.Equals, 0)
622 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
623 c.Assert(err, check.IsNil)
624 c.Check(string(lost), check.Equals, "")
627 func (s *runSuite) TestRunForever(c *check.C) {
628 s.config.ManagementToken = "xyzzy"
630 Logger: ctxlog.TestLogger(c),
631 Dumper: ctxlog.TestLogger(c),
633 s.stub.serveCurrentUserAdmin()
634 s.stub.serveFooBarFileCollections()
635 s.stub.serveKeepServices(stubServices)
636 s.stub.serveKeepstoreMounts()
637 s.stub.serveKeepstoreIndexFoo4Bar1()
638 trashReqs := s.stub.serveKeepstoreTrash()
639 pullReqs := s.stub.serveKeepstorePull()
641 ctx, cancel := context.WithCancel(context.Background())
643 s.config.Collections.BalancePeriod = arvados.Duration(100 * time.Millisecond)
644 srv := s.newServer(&opts)
646 done := make(chan bool)
652 procself, err := os.FindProcess(os.Getpid())
653 c.Assert(err, check.IsNil)
655 // Each run should send 4 pull lists + 4 trash lists. The
656 // first run should also send 4 empty trash lists at
657 // startup. We should complete all four runs in much less than
660 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
661 pulls := pullReqs.Count()
662 if pulls >= 16 && trashReqs.Count() == pulls+4 {
666 // Once the 2nd run has started automatically
667 // (indicating that our BalancePeriod is
668 // working) we switch to a long wait time to
669 // effectively stop the timed runs, and
670 // instead start sending a single SIGUSR1 at
671 // the end of each (2nd or 3rd) run, to ensure
672 // we get exactly 4 runs in total.
673 srv.Cluster.Collections.BalancePeriod = arvados.Duration(time.Minute)
674 if pulls%4 == 0 && pulls <= 12 && pulls/4 > completedRuns {
675 completedRuns = pulls / 4
676 c.Logf("completed run %d, sending SIGUSR1 to trigger next run", completedRuns)
677 procself.Signal(syscall.SIGUSR1)
680 time.Sleep(time.Millisecond)
684 c.Check(pullReqs.Count() >= 16, check.Equals, true)
685 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
687 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
688 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)