1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/config"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/jmoiron/sqlx"
26 "github.com/prometheus/client_golang/prometheus"
27 check "gopkg.in/check.v1"
30 var _ = check.Suite(&runSuite{})
32 type reqTracker struct {
37 func (rt *reqTracker) Count() int {
43 func (rt *reqTracker) Add(req *http.Request) int {
46 rt.reqs = append(rt.reqs, *req)
50 var stubServices = []arvados.KeepService{
52 UUID: "zzzzz-bi6l4-000000000000000",
53 ServiceHost: "keep0.zzzzz.arvadosapi.com",
55 ServiceSSLFlag: false,
59 UUID: "zzzzz-bi6l4-000000000000001",
60 ServiceHost: "keep1.zzzzz.arvadosapi.com",
62 ServiceSSLFlag: false,
66 UUID: "zzzzz-bi6l4-000000000000002",
67 ServiceHost: "keep2.zzzzz.arvadosapi.com",
69 ServiceSSLFlag: false,
73 UUID: "zzzzz-bi6l4-000000000000003",
74 ServiceHost: "keep3.zzzzz.arvadosapi.com",
76 ServiceSSLFlag: false,
80 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
81 ServiceHost: "keep.zzzzz.arvadosapi.com",
88 var stubMounts = map[string][]arvados.KeepMount{
89 "keep0.zzzzz.arvadosapi.com:25107": {{
90 UUID: "zzzzz-ivpuk-000000000000000",
91 DeviceID: "keep0-vol0",
92 StorageClasses: map[string]bool{"default": true},
96 "keep1.zzzzz.arvadosapi.com:25107": {{
97 UUID: "zzzzz-ivpuk-100000000000000",
98 DeviceID: "keep1-vol0",
99 StorageClasses: map[string]bool{"default": true},
103 "keep2.zzzzz.arvadosapi.com:25107": {{
104 UUID: "zzzzz-ivpuk-200000000000000",
105 DeviceID: "keep2-vol0",
106 StorageClasses: map[string]bool{"default": true},
110 "keep3.zzzzz.arvadosapi.com:25107": {{
111 UUID: "zzzzz-ivpuk-300000000000000",
112 DeviceID: "keep3-vol0",
113 StorageClasses: map[string]bool{"default": true},
119 // stubServer is an HTTP transport that intercepts and processes all
120 // requests using its own handlers.
121 type stubServer struct {
126 logf func(string, ...interface{})
129 // Start initializes the stub server and returns an *http.Client that
130 // uses the stub server to handle all requests.
132 // A stubServer that has been started should eventually be shut down
134 func (s *stubServer) Start() *http.Client {
135 // Set up a config.Client that forwards all requests to s.mux
136 // via s.srv. Test cases will attach handlers to s.mux to get
137 // the desired responses.
138 s.mux = http.NewServeMux()
139 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
143 w.Header().Set("Content-Type", "application/json")
144 s.mux.ServeHTTP(w, r)
146 return &http.Client{Transport: s}
149 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
150 w := httptest.NewRecorder()
151 s.mux.ServeHTTP(w, req)
152 return &http.Response{
154 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
156 Body: ioutil.NopCloser(w.Body)}, nil
159 // Close releases resources used by the server.
160 func (s *stubServer) Close() {
164 func (s *stubServer) serveStatic(path, data string) *reqTracker {
166 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
169 ioutil.ReadAll(r.Body)
172 io.WriteString(w, data)
177 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
178 return s.serveStatic("/arvados/v1/users/current",
179 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
182 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
183 return s.serveStatic("/arvados/v1/users/current",
184 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
187 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
188 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
189 `{"defaultCollectionReplication":2}`)
192 func (s *stubServer) serveZeroCollections() *reqTracker {
193 return s.serveStatic("/arvados/v1/collections",
194 `{"items":[],"items_available":0}`)
197 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
199 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
202 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
203 io.WriteString(w, `{"items_available":0,"items":[]}`)
205 io.WriteString(w, `{"items_available":3,"items":[
206 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
208 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
214 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
216 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
219 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
220 io.WriteString(w, `{"items_available":3,"items":[]}`)
221 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
222 io.WriteString(w, `{"items_available":0,"items":[]}`)
223 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
224 io.WriteString(w, `{"items_available":0,"items":[]}`)
225 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
226 io.WriteString(w, `{"items_available":0,"items":[]}`)
228 io.WriteString(w, `{"items_available":2,"items":[
229 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
230 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
236 func (s *stubServer) serveZeroKeepServices() *reqTracker {
237 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
240 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
241 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
242 ItemsAvailable: len(svcs),
247 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
249 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
251 json.NewEncoder(w).Encode(resp)
256 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
258 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
260 json.NewEncoder(w).Encode(stubMounts[r.Host])
265 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
266 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
267 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
269 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
271 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
272 io.WriteString(w, barLine)
274 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
275 io.WriteString(w, fooLine(count))
277 io.WriteString(w, "\n")
279 for _, mounts := range stubMounts {
280 for i, mnt := range mounts {
282 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
285 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
286 io.WriteString(w, barLine)
288 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
289 io.WriteString(w, fooLine(count))
291 io.WriteString(w, "\n")
298 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
299 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
301 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
303 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
304 io.WriteString(w, fooLine)
306 io.WriteString(w, "\n")
308 for _, mounts := range stubMounts {
309 for i, mnt := range mounts {
311 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
313 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
314 io.WriteString(w, fooLine)
316 io.WriteString(w, "\n")
323 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
324 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
326 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
328 io.WriteString(w, fooLine)
329 io.WriteString(w, "\n")
331 for _, mounts := range stubMounts {
332 for _, mnt := range mounts {
333 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
335 io.WriteString(w, fooLine)
336 io.WriteString(w, "\n")
343 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
344 return s.serveStatic("/trash", `{}`)
347 func (s *stubServer) serveKeepstorePull() *reqTracker {
348 return s.serveStatic("/pull", `{}`)
351 type runSuite struct {
353 config *arvados.Cluster
355 client *arvados.Client
358 func (s *runSuite) newServer(options *RunOptions) *Server {
362 RunOptions: *options,
363 Metrics: newMetrics(prometheus.NewRegistry()),
364 Logger: options.Logger,
365 Dumper: options.Dumper,
371 func (s *runSuite) SetUpTest(c *check.C) {
372 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
373 c.Assert(err, check.Equals, nil)
374 s.config, err = cfg.GetCluster("")
375 c.Assert(err, check.Equals, nil)
376 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
377 c.Assert(err, check.IsNil)
379 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
380 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
382 s.client = &arvados.Client{
384 APIHost: "zzzzz.arvadosapi.com",
385 Client: s.stub.Start()}
387 s.stub.serveDiscoveryDoc()
391 func (s *runSuite) TearDownTest(c *check.C) {
395 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
396 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
397 _, err := s.db.Exec(`delete from collections`)
398 c.Assert(err, check.IsNil)
400 Logger: ctxlog.TestLogger(c),
402 s.stub.serveCurrentUserAdmin()
403 s.stub.serveZeroCollections()
404 s.stub.serveKeepServices(stubServices)
405 s.stub.serveKeepstoreMounts()
406 s.stub.serveKeepstoreIndexFoo4Bar1()
407 trashReqs := s.stub.serveKeepstoreTrash()
408 pullReqs := s.stub.serveKeepstorePull()
409 srv := s.newServer(&opts)
410 _, err = srv.runOnce(context.Background())
411 c.Check(err, check.ErrorMatches, "received zero collections")
412 c.Check(trashReqs.Count(), check.Equals, 4)
413 c.Check(pullReqs.Count(), check.Equals, 0)
416 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
419 Logger: ctxlog.TestLogger(c),
421 s.stub.serveCurrentUserAdmin()
422 s.stub.serveFooBarFileCollections()
423 s.stub.serveKeepServices(stubServices)
424 s.stub.serveKeepstoreMounts()
425 s.stub.serveKeepstoreIndexIgnoringPrefix()
426 trashReqs := s.stub.serveKeepstoreTrash()
427 pullReqs := s.stub.serveKeepstorePull()
428 srv := s.newServer(&opts)
429 bal, err := srv.runOnce(context.Background())
430 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
431 c.Check(trashReqs.Count(), check.Equals, 4)
432 c.Check(pullReqs.Count(), check.Equals, 0)
433 c.Check(bal.stats.trashes, check.Equals, 0)
434 c.Check(bal.stats.pulls, check.Equals, 0)
437 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
439 Logger: ctxlog.TestLogger(c),
441 s.stub.serveCurrentUserNotAdmin()
442 s.stub.serveZeroCollections()
443 s.stub.serveKeepServices(stubServices)
444 s.stub.serveKeepstoreMounts()
445 trashReqs := s.stub.serveKeepstoreTrash()
446 pullReqs := s.stub.serveKeepstorePull()
447 srv := s.newServer(&opts)
448 _, err := srv.runOnce(context.Background())
449 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
450 c.Check(trashReqs.Count(), check.Equals, 0)
451 c.Check(pullReqs.Count(), check.Equals, 0)
454 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
455 for _, trial := range []struct {
459 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
460 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
461 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
464 c.Logf("trying invalid prefix %q", trial.prefix)
466 ChunkPrefix: trial.prefix,
467 Logger: ctxlog.TestLogger(c),
469 s.stub.serveCurrentUserAdmin()
470 s.stub.serveFooBarFileCollections()
471 s.stub.serveKeepServices(stubServices)
472 s.stub.serveKeepstoreMounts()
473 trashReqs := s.stub.serveKeepstoreTrash()
474 pullReqs := s.stub.serveKeepstorePull()
475 srv := s.newServer(&opts)
476 _, err := srv.runOnce(context.Background())
477 c.Check(err, check.ErrorMatches, trial.errRe)
478 c.Check(trashReqs.Count(), check.Equals, 0)
479 c.Check(pullReqs.Count(), check.Equals, 0)
483 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
485 Logger: ctxlog.TestLogger(c),
487 s.stub.serveCurrentUserAdmin()
488 s.stub.serveZeroCollections()
489 s.stub.serveKeepServices(stubServices)
490 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
491 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
492 json.NewEncoder(w).Encode([]arvados.KeepMount{{
493 UUID: "zzzzz-ivpuk-0000000000" + hostid,
494 DeviceID: "keep0-vol0",
495 StorageClasses: map[string]bool{"default": true},
498 trashReqs := s.stub.serveKeepstoreTrash()
499 pullReqs := s.stub.serveKeepstorePull()
500 srv := s.newServer(&opts)
501 _, err := srv.runOnce(context.Background())
502 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
503 c.Check(trashReqs.Count(), check.Equals, 0)
504 c.Check(pullReqs.Count(), check.Equals, 0)
507 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
508 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
509 c.Assert(err, check.IsNil)
510 s.config.Collections.BlobMissingReport = lostf.Name()
511 defer os.Remove(lostf.Name())
513 Logger: ctxlog.TestLogger(c),
515 s.stub.serveCurrentUserAdmin()
516 s.stub.serveFooBarFileCollections()
517 s.stub.serveKeepServices(stubServices)
518 s.stub.serveKeepstoreMounts()
519 s.stub.serveKeepstoreIndexFoo1()
520 s.stub.serveKeepstoreTrash()
521 s.stub.serveKeepstorePull()
522 srv := s.newServer(&opts)
523 c.Assert(err, check.IsNil)
524 _, err = srv.runOnce(context.Background())
525 c.Check(err, check.IsNil)
526 lost, err := ioutil.ReadFile(lostf.Name())
527 c.Assert(err, check.IsNil)
528 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
531 func (s *runSuite) TestDryRun(c *check.C) {
532 s.config.Collections.BalanceTrashLimit = 0
533 s.config.Collections.BalancePullLimit = 0
535 Logger: ctxlog.TestLogger(c),
537 s.stub.serveCurrentUserAdmin()
538 collReqs := s.stub.serveFooBarFileCollections()
539 s.stub.serveKeepServices(stubServices)
540 s.stub.serveKeepstoreMounts()
541 s.stub.serveKeepstoreIndexFoo4Bar1()
542 trashReqs := s.stub.serveKeepstoreTrash()
543 pullReqs := s.stub.serveKeepstorePull()
544 srv := s.newServer(&opts)
545 bal, err := srv.runOnce(context.Background())
546 c.Check(err, check.IsNil)
547 for _, req := range collReqs.reqs {
548 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
549 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
551 c.Check(trashReqs.Count(), check.Equals, 0)
552 c.Check(pullReqs.Count(), check.Equals, 0)
553 c.Check(bal.stats.pulls, check.Equals, 0)
554 c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
555 c.Check(bal.stats.trashes, check.Equals, 0)
556 c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
557 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
561 func (s *runSuite) TestCommit(c *check.C) {
562 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
563 s.config.ManagementToken = "xyzzy"
565 Logger: ctxlog.TestLogger(c),
566 Dumper: ctxlog.TestLogger(c),
568 s.stub.serveCurrentUserAdmin()
569 s.stub.serveFooBarFileCollections()
570 s.stub.serveKeepServices(stubServices)
571 s.stub.serveKeepstoreMounts()
572 s.stub.serveKeepstoreIndexFoo4Bar1()
573 trashReqs := s.stub.serveKeepstoreTrash()
574 pullReqs := s.stub.serveKeepstorePull()
575 srv := s.newServer(&opts)
576 bal, err := srv.runOnce(context.Background())
577 c.Check(err, check.IsNil)
578 c.Check(trashReqs.Count(), check.Equals, 8)
579 c.Check(pullReqs.Count(), check.Equals, 4)
580 // "foo" block is overreplicated by 2
581 c.Check(bal.stats.trashes, check.Equals, 2)
582 // "bar" block is underreplicated by 1, and its only copy is
583 // in a poor rendezvous position
584 c.Check(bal.stats.pulls, check.Equals, 2)
586 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
587 c.Assert(err, check.IsNil)
588 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
590 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
591 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
592 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
593 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
594 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
595 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
598 func (s *runSuite) TestChunkPrefix(c *check.C) {
599 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
601 ChunkPrefix: "ac", // catch "foo" but not "bar"
602 Logger: ctxlog.TestLogger(c),
603 Dumper: ctxlog.TestLogger(c),
605 s.stub.serveCurrentUserAdmin()
606 s.stub.serveFooBarFileCollections()
607 s.stub.serveKeepServices(stubServices)
608 s.stub.serveKeepstoreMounts()
609 s.stub.serveKeepstoreIndexFoo4Bar1()
610 trashReqs := s.stub.serveKeepstoreTrash()
611 pullReqs := s.stub.serveKeepstorePull()
612 srv := s.newServer(&opts)
613 bal, err := srv.runOnce(context.Background())
614 c.Check(err, check.IsNil)
615 c.Check(trashReqs.Count(), check.Equals, 8)
616 c.Check(pullReqs.Count(), check.Equals, 4)
617 // "foo" block is overreplicated by 2
618 c.Check(bal.stats.trashes, check.Equals, 2)
619 // "bar" block is underreplicated but does not match prefix
620 c.Check(bal.stats.pulls, check.Equals, 0)
622 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
623 c.Assert(err, check.IsNil)
624 c.Check(string(lost), check.Equals, "")
627 func (s *runSuite) TestRunForever_TriggeredByTimer(c *check.C) {
628 s.config.ManagementToken = "xyzzy"
630 Logger: ctxlog.TestLogger(c),
631 Dumper: ctxlog.TestLogger(c),
633 s.stub.serveCurrentUserAdmin()
634 s.stub.serveFooBarFileCollections()
635 s.stub.serveKeepServices(stubServices)
636 s.stub.serveKeepstoreMounts()
637 s.stub.serveKeepstoreIndexFoo4Bar1()
638 trashReqs := s.stub.serveKeepstoreTrash()
639 pullReqs := s.stub.serveKeepstorePull()
641 ctx, cancel := context.WithCancel(context.Background())
643 s.config.Collections.BalancePeriod = arvados.Duration(10 * time.Millisecond)
644 srv := s.newServer(&opts)
646 done := make(chan bool)
652 // Each run should send 4 pull lists + 4 trash lists. The
653 // first run should also send 4 empty trash lists at
654 // startup. We should complete at least four runs in much less
656 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
657 pulls := pullReqs.Count()
658 if pulls >= 16 && trashReqs.Count() == pulls+4 {
661 time.Sleep(time.Millisecond)
665 c.Check(pullReqs.Count() >= 16, check.Equals, true)
666 c.Check(trashReqs.Count() >= 20, check.Equals, true)
668 // We should have completed 4 runs before calling cancel().
669 // But the next run might also have started before we called
670 // cancel(), in which case the extra run will be included in
671 // the changeset_compute_seconds_count metric.
672 completed := pullReqs.Count() / 4
673 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
674 c.Check(metrics, check.Matches, fmt.Sprintf(`(?ms).*\narvados_keepbalance_changeset_compute_seconds_count (%d|%d)\n.*`, completed, completed+1))
677 func (s *runSuite) TestRunForever_TriggeredBySignal(c *check.C) {
678 s.config.ManagementToken = "xyzzy"
680 Logger: ctxlog.TestLogger(c),
681 Dumper: ctxlog.TestLogger(c),
683 s.stub.serveCurrentUserAdmin()
684 s.stub.serveFooBarFileCollections()
685 s.stub.serveKeepServices(stubServices)
686 s.stub.serveKeepstoreMounts()
687 s.stub.serveKeepstoreIndexFoo4Bar1()
688 trashReqs := s.stub.serveKeepstoreTrash()
689 pullReqs := s.stub.serveKeepstorePull()
691 ctx, cancel := context.WithCancel(context.Background())
693 s.config.Collections.BalancePeriod = arvados.Duration(time.Minute)
694 srv := s.newServer(&opts)
696 done := make(chan bool)
702 procself, err := os.FindProcess(os.Getpid())
703 c.Assert(err, check.IsNil)
705 // Each run should send 4 pull lists + 4 trash lists. The
706 // first run should also send 4 empty trash lists at
707 // startup. We should be able to complete four runs in much
710 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
711 pulls := pullReqs.Count()
712 if pulls >= 16 && trashReqs.Count() == pulls+4 {
715 // Once the 1st run has started automatically, we
716 // start sending a single SIGUSR1 at the end of each
717 // run, to ensure we get exactly 4 runs in total.
718 if pulls > 0 && pulls%4 == 0 && pulls <= 12 && pulls/4 > completedRuns {
719 completedRuns = pulls / 4
720 c.Logf("completed run %d, sending SIGUSR1 to trigger next run", completedRuns)
721 procself.Signal(syscall.SIGUSR1)
723 time.Sleep(time.Millisecond)
727 c.Check(pullReqs.Count(), check.Equals, 16)
728 c.Check(trashReqs.Count(), check.Equals, 20)
730 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
731 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 4\n.*`)