1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/config"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/jmoiron/sqlx"
26 "github.com/prometheus/client_golang/prometheus"
27 check "gopkg.in/check.v1"
30 var _ = check.Suite(&runSuite{})
32 type reqTracker struct {
37 func (rt *reqTracker) Count() int {
43 func (rt *reqTracker) Add(req *http.Request) int {
46 rt.reqs = append(rt.reqs, *req)
50 var stubServices = []arvados.KeepService{
52 UUID: "zzzzz-bi6l4-000000000000000",
53 ServiceHost: "keep0.zzzzz.arvadosapi.com",
55 ServiceSSLFlag: false,
59 UUID: "zzzzz-bi6l4-000000000000001",
60 ServiceHost: "keep1.zzzzz.arvadosapi.com",
62 ServiceSSLFlag: false,
66 UUID: "zzzzz-bi6l4-000000000000002",
67 ServiceHost: "keep2.zzzzz.arvadosapi.com",
69 ServiceSSLFlag: false,
73 UUID: "zzzzz-bi6l4-000000000000003",
74 ServiceHost: "keep3.zzzzz.arvadosapi.com",
76 ServiceSSLFlag: false,
80 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
81 ServiceHost: "keep.zzzzz.arvadosapi.com",
88 var stubMounts = map[string][]arvados.KeepMount{
89 "keep0.zzzzz.arvadosapi.com:25107": {{
90 UUID: "zzzzz-ivpuk-000000000000000",
91 DeviceID: "keep0-vol0",
92 StorageClasses: map[string]bool{"default": true},
96 "keep1.zzzzz.arvadosapi.com:25107": {{
97 UUID: "zzzzz-ivpuk-100000000000000",
98 DeviceID: "keep1-vol0",
99 StorageClasses: map[string]bool{"default": true},
103 "keep2.zzzzz.arvadosapi.com:25107": {{
104 UUID: "zzzzz-ivpuk-200000000000000",
105 DeviceID: "keep2-vol0",
106 StorageClasses: map[string]bool{"default": true},
110 "keep3.zzzzz.arvadosapi.com:25107": {{
111 UUID: "zzzzz-ivpuk-300000000000000",
112 DeviceID: "keep3-vol0",
113 StorageClasses: map[string]bool{"default": true},
119 // stubServer is an HTTP transport that intercepts and processes all
120 // requests using its own handlers.
121 type stubServer struct {
126 logf func(string, ...interface{})
129 // Start initializes the stub server and returns an *http.Client that
130 // uses the stub server to handle all requests.
132 // A stubServer that has been started should eventually be shut down
134 func (s *stubServer) Start() *http.Client {
135 // Set up a config.Client that forwards all requests to s.mux
136 // via s.srv. Test cases will attach handlers to s.mux to get
137 // the desired responses.
138 s.mux = http.NewServeMux()
139 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
143 w.Header().Set("Content-Type", "application/json")
144 s.mux.ServeHTTP(w, r)
146 return &http.Client{Transport: s}
149 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
150 w := httptest.NewRecorder()
151 s.mux.ServeHTTP(w, req)
152 return &http.Response{
154 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
156 Body: ioutil.NopCloser(w.Body)}, nil
159 // Close releases resources used by the server.
160 func (s *stubServer) Close() {
164 func (s *stubServer) serveStatic(path, data string) *reqTracker {
166 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
169 ioutil.ReadAll(r.Body)
172 io.WriteString(w, data)
177 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
178 return s.serveStatic("/arvados/v1/users/current",
179 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
182 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
183 return s.serveStatic("/arvados/v1/users/current",
184 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
187 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
188 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
189 `{"defaultCollectionReplication":2}`)
192 func (s *stubServer) serveZeroCollections() *reqTracker {
193 return s.serveStatic("/arvados/v1/collections",
194 `{"items":[],"items_available":0}`)
197 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
199 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
202 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
203 io.WriteString(w, `{"items_available":0,"items":[]}`)
205 io.WriteString(w, `{"items_available":3,"items":[
206 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
208 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
214 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
216 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
219 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
220 io.WriteString(w, `{"items_available":3,"items":[]}`)
221 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
222 io.WriteString(w, `{"items_available":0,"items":[]}`)
223 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
224 io.WriteString(w, `{"items_available":0,"items":[]}`)
225 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
226 io.WriteString(w, `{"items_available":0,"items":[]}`)
228 io.WriteString(w, `{"items_available":2,"items":[
229 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
230 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
236 func (s *stubServer) serveZeroKeepServices() *reqTracker {
237 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
240 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
241 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
242 ItemsAvailable: len(svcs),
247 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
249 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
251 json.NewEncoder(w).Encode(resp)
256 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
258 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
260 json.NewEncoder(w).Encode(stubMounts[r.Host])
265 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
266 fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
267 barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
269 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
271 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
272 io.WriteString(w, barLine)
274 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
275 io.WriteString(w, fooLine(count))
277 io.WriteString(w, "\n")
279 for _, mounts := range stubMounts {
280 for i, mnt := range mounts {
282 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
285 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
286 io.WriteString(w, barLine)
288 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
289 io.WriteString(w, fooLine(count))
291 io.WriteString(w, "\n")
298 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
299 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
301 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
303 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
304 io.WriteString(w, fooLine)
306 io.WriteString(w, "\n")
308 for _, mounts := range stubMounts {
309 for i, mnt := range mounts {
311 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
313 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
314 io.WriteString(w, fooLine)
316 io.WriteString(w, "\n")
323 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
324 fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
326 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
328 io.WriteString(w, fooLine)
329 io.WriteString(w, "\n")
331 for _, mounts := range stubMounts {
332 for _, mnt := range mounts {
333 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
335 io.WriteString(w, fooLine)
336 io.WriteString(w, "\n")
343 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
344 return s.serveStatic("/trash", `{}`)
347 func (s *stubServer) serveKeepstorePull() *reqTracker {
348 return s.serveStatic("/pull", `{}`)
351 type runSuite struct {
353 config *arvados.Cluster
355 client *arvados.Client
358 func (s *runSuite) newServer(options *RunOptions) *Server {
362 RunOptions: *options,
363 Metrics: newMetrics(prometheus.NewRegistry()),
364 Logger: options.Logger,
365 Dumper: options.Dumper,
371 func (s *runSuite) SetUpTest(c *check.C) {
372 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
373 c.Assert(err, check.Equals, nil)
374 s.config, err = cfg.GetCluster("")
375 c.Assert(err, check.Equals, nil)
376 s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
377 c.Assert(err, check.IsNil)
379 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
380 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
382 s.client = &arvados.Client{
384 APIHost: "zzzzz.arvadosapi.com",
385 Client: s.stub.Start()}
387 s.stub.serveDiscoveryDoc()
391 func (s *runSuite) TearDownTest(c *check.C) {
395 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
396 defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
397 _, err := s.db.Exec(`delete from collections`)
398 c.Assert(err, check.IsNil)
400 Logger: ctxlog.TestLogger(c),
402 s.stub.serveCurrentUserAdmin()
403 s.stub.serveZeroCollections()
404 s.stub.serveKeepServices(stubServices)
405 s.stub.serveKeepstoreMounts()
406 s.stub.serveKeepstoreIndexFoo4Bar1()
407 trashReqs := s.stub.serveKeepstoreTrash()
408 pullReqs := s.stub.serveKeepstorePull()
409 srv := s.newServer(&opts)
410 _, err = srv.runOnce(context.Background())
411 c.Check(err, check.ErrorMatches, "received zero collections")
412 c.Check(trashReqs.Count(), check.Equals, 4)
413 c.Check(pullReqs.Count(), check.Equals, 0)
416 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
419 Logger: ctxlog.TestLogger(c),
421 s.stub.serveCurrentUserAdmin()
422 s.stub.serveFooBarFileCollections()
423 s.stub.serveKeepServices(stubServices)
424 s.stub.serveKeepstoreMounts()
425 s.stub.serveKeepstoreIndexIgnoringPrefix()
426 trashReqs := s.stub.serveKeepstoreTrash()
427 pullReqs := s.stub.serveKeepstorePull()
428 srv := s.newServer(&opts)
429 bal, err := srv.runOnce(context.Background())
430 c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
431 c.Check(trashReqs.Count(), check.Equals, 4)
432 c.Check(pullReqs.Count(), check.Equals, 0)
433 c.Check(bal.stats.trashes, check.Equals, 0)
434 c.Check(bal.stats.pulls, check.Equals, 0)
437 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
439 Logger: ctxlog.TestLogger(c),
441 s.stub.serveCurrentUserNotAdmin()
442 s.stub.serveZeroCollections()
443 s.stub.serveKeepServices(stubServices)
444 s.stub.serveKeepstoreMounts()
445 trashReqs := s.stub.serveKeepstoreTrash()
446 pullReqs := s.stub.serveKeepstorePull()
447 srv := s.newServer(&opts)
448 _, err := srv.runOnce(context.Background())
449 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
450 c.Check(trashReqs.Count(), check.Equals, 0)
451 c.Check(pullReqs.Count(), check.Equals, 0)
454 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
455 for _, trial := range []struct {
459 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
460 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
461 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
464 c.Logf("trying invalid prefix %q", trial.prefix)
466 ChunkPrefix: trial.prefix,
467 Logger: ctxlog.TestLogger(c),
469 s.stub.serveCurrentUserAdmin()
470 s.stub.serveFooBarFileCollections()
471 s.stub.serveKeepServices(stubServices)
472 s.stub.serveKeepstoreMounts()
473 trashReqs := s.stub.serveKeepstoreTrash()
474 pullReqs := s.stub.serveKeepstorePull()
475 srv := s.newServer(&opts)
476 _, err := srv.runOnce(context.Background())
477 c.Check(err, check.ErrorMatches, trial.errRe)
478 c.Check(trashReqs.Count(), check.Equals, 0)
479 c.Check(pullReqs.Count(), check.Equals, 0)
483 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
485 Logger: ctxlog.TestLogger(c),
487 s.stub.serveCurrentUserAdmin()
488 s.stub.serveZeroCollections()
489 s.stub.serveKeepServices(stubServices)
490 s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
491 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
492 json.NewEncoder(w).Encode([]arvados.KeepMount{{
493 UUID: "zzzzz-ivpuk-0000000000" + hostid,
494 DeviceID: "keep0-vol0",
495 StorageClasses: map[string]bool{"default": true},
498 trashReqs := s.stub.serveKeepstoreTrash()
499 pullReqs := s.stub.serveKeepstorePull()
500 srv := s.newServer(&opts)
501 _, err := srv.runOnce(context.Background())
502 c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
503 c.Check(trashReqs.Count(), check.Equals, 0)
504 c.Check(pullReqs.Count(), check.Equals, 0)
507 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
508 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
509 c.Assert(err, check.IsNil)
510 s.config.Collections.BlobMissingReport = lostf.Name()
511 defer os.Remove(lostf.Name())
513 Logger: ctxlog.TestLogger(c),
515 s.stub.serveCurrentUserAdmin()
516 s.stub.serveFooBarFileCollections()
517 s.stub.serveKeepServices(stubServices)
518 s.stub.serveKeepstoreMounts()
519 s.stub.serveKeepstoreIndexFoo1()
520 s.stub.serveKeepstoreTrash()
521 s.stub.serveKeepstorePull()
522 srv := s.newServer(&opts)
523 c.Assert(err, check.IsNil)
524 _, err = srv.runOnce(context.Background())
525 c.Check(err, check.IsNil)
526 lost, err := ioutil.ReadFile(lostf.Name())
527 c.Assert(err, check.IsNil)
528 c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
531 func (s *runSuite) TestDryRun(c *check.C) {
532 s.config.Collections.BalanceTrashLimit = 0
533 s.config.Collections.BalancePullLimit = 0
535 Logger: ctxlog.TestLogger(c),
537 s.stub.serveCurrentUserAdmin()
538 collReqs := s.stub.serveFooBarFileCollections()
539 s.stub.serveKeepServices(stubServices)
540 s.stub.serveKeepstoreMounts()
541 s.stub.serveKeepstoreIndexFoo4Bar1()
542 trashReqs := s.stub.serveKeepstoreTrash()
543 pullReqs := s.stub.serveKeepstorePull()
544 srv := s.newServer(&opts)
545 bal, err := srv.runOnce(context.Background())
546 c.Check(err, check.IsNil)
547 for _, req := range collReqs.reqs {
548 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
549 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
551 c.Check(trashReqs.Count(), check.Equals, 0)
552 c.Check(pullReqs.Count(), check.Equals, 0)
553 c.Check(bal.stats.pulls, check.Equals, 0)
554 c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
555 c.Check(bal.stats.trashes, check.Equals, 0)
556 c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
557 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
560 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
561 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_trash_entries_deferred_count [1-9].*`)
562 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_pull_entries_deferred_count [1-9].*`)
565 func (s *runSuite) TestCommit(c *check.C) {
566 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
567 s.config.ManagementToken = "xyzzy"
569 Logger: ctxlog.TestLogger(c),
570 Dumper: ctxlog.TestLogger(c),
572 s.stub.serveCurrentUserAdmin()
573 s.stub.serveFooBarFileCollections()
574 s.stub.serveKeepServices(stubServices)
575 s.stub.serveKeepstoreMounts()
576 s.stub.serveKeepstoreIndexFoo4Bar1()
577 trashReqs := s.stub.serveKeepstoreTrash()
578 pullReqs := s.stub.serveKeepstorePull()
579 srv := s.newServer(&opts)
580 bal, err := srv.runOnce(context.Background())
581 c.Check(err, check.IsNil)
582 c.Check(trashReqs.Count(), check.Equals, 8)
583 c.Check(pullReqs.Count(), check.Equals, 4)
584 // "foo" block is overreplicated by 2
585 c.Check(bal.stats.trashes, check.Equals, 2)
586 // "bar" block is underreplicated by 1, and its only copy is
587 // in a poor rendezvous position
588 c.Check(bal.stats.pulls, check.Equals, 2)
590 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
591 c.Assert(err, check.IsNil)
592 c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
594 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
595 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
596 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
597 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
598 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
599 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
601 for _, cat := range []string{
602 "dedup_byte_ratio", "dedup_block_ratio", "collection_bytes",
603 "referenced_bytes", "referenced_blocks", "reference_count",
604 "pull_entries_sent_count",
605 "trash_entries_sent_count",
607 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+` [1-9].*`)
610 for _, cat := range []string{
611 "pull_entries_deferred_count",
612 "trash_entries_deferred_count",
614 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+` 0\n.*`)
617 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="0"} [1-9].*`)
618 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="1"} [1-9].*`)
619 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="9"} 0\n.*`)
621 for _, sub := range []string{"replicas", "blocks", "bytes"} {
622 for _, cat := range []string{"needed", "unneeded", "unachievable", "pulling"} {
623 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_usage_`+sub+`{status="`+cat+`",storage_class="default"} [1-9].*`)
625 for _, cat := range []string{"total", "garbage", "transient", "overreplicated", "underreplicated", "unachievable", "balanced", "desired", "lost"} {
626 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+`_`+sub+` [0-9].*`)
629 c.Logf("%s", metrics)
632 func (s *runSuite) TestChunkPrefix(c *check.C) {
633 s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
635 ChunkPrefix: "ac", // catch "foo" but not "bar"
636 Logger: ctxlog.TestLogger(c),
637 Dumper: ctxlog.TestLogger(c),
639 s.stub.serveCurrentUserAdmin()
640 s.stub.serveFooBarFileCollections()
641 s.stub.serveKeepServices(stubServices)
642 s.stub.serveKeepstoreMounts()
643 s.stub.serveKeepstoreIndexFoo4Bar1()
644 trashReqs := s.stub.serveKeepstoreTrash()
645 pullReqs := s.stub.serveKeepstorePull()
646 srv := s.newServer(&opts)
647 bal, err := srv.runOnce(context.Background())
648 c.Check(err, check.IsNil)
649 c.Check(trashReqs.Count(), check.Equals, 8)
650 c.Check(pullReqs.Count(), check.Equals, 4)
651 // "foo" block is overreplicated by 2
652 c.Check(bal.stats.trashes, check.Equals, 2)
653 // "bar" block is underreplicated but does not match prefix
654 c.Check(bal.stats.pulls, check.Equals, 0)
656 lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
657 c.Assert(err, check.IsNil)
658 c.Check(string(lost), check.Equals, "")
661 func (s *runSuite) TestRunForever_TriggeredByTimer(c *check.C) {
662 s.config.ManagementToken = "xyzzy"
664 Logger: ctxlog.TestLogger(c),
665 Dumper: ctxlog.TestLogger(c),
667 s.stub.serveCurrentUserAdmin()
668 s.stub.serveFooBarFileCollections()
669 s.stub.serveKeepServices(stubServices)
670 s.stub.serveKeepstoreMounts()
671 s.stub.serveKeepstoreIndexFoo4Bar1()
672 trashReqs := s.stub.serveKeepstoreTrash()
673 pullReqs := s.stub.serveKeepstorePull()
675 ctx, cancel := context.WithCancel(context.Background())
677 s.config.Collections.BalancePeriod = arvados.Duration(10 * time.Millisecond)
678 srv := s.newServer(&opts)
680 done := make(chan bool)
686 // Each run should send 4 pull lists + 4 trash lists. The
687 // first run should also send 4 empty trash lists at
688 // startup. We should complete at least four runs in much less
690 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
691 pulls := pullReqs.Count()
692 if pulls >= 16 && trashReqs.Count() == pulls+4 {
695 time.Sleep(time.Millisecond)
699 c.Check(pullReqs.Count() >= 16, check.Equals, true)
700 c.Check(trashReqs.Count() >= 20, check.Equals, true)
702 // We should have completed 4 runs before calling cancel().
703 // But the next run might also have started before we called
704 // cancel(), in which case the extra run will be included in
705 // the changeset_compute_seconds_count metric.
706 completed := pullReqs.Count() / 4
707 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
708 c.Check(metrics, check.Matches, fmt.Sprintf(`(?ms).*\narvados_keepbalance_changeset_compute_seconds_count (%d|%d)\n.*`, completed, completed+1))
711 func (s *runSuite) TestRunForever_TriggeredBySignal(c *check.C) {
712 s.config.ManagementToken = "xyzzy"
714 Logger: ctxlog.TestLogger(c),
715 Dumper: ctxlog.TestLogger(c),
717 s.stub.serveCurrentUserAdmin()
718 s.stub.serveFooBarFileCollections()
719 s.stub.serveKeepServices(stubServices)
720 s.stub.serveKeepstoreMounts()
721 s.stub.serveKeepstoreIndexFoo4Bar1()
722 trashReqs := s.stub.serveKeepstoreTrash()
723 pullReqs := s.stub.serveKeepstorePull()
725 ctx, cancel := context.WithCancel(context.Background())
727 s.config.Collections.BalancePeriod = arvados.Duration(time.Minute)
728 srv := s.newServer(&opts)
730 done := make(chan bool)
736 procself, err := os.FindProcess(os.Getpid())
737 c.Assert(err, check.IsNil)
739 // Each run should send 4 pull lists + 4 trash lists. The
740 // first run should also send 4 empty trash lists at
741 // startup. We should be able to complete four runs in much
744 for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
745 pulls := pullReqs.Count()
746 if pulls >= 16 && trashReqs.Count() == pulls+4 {
749 // Once the 1st run has started automatically, we
750 // start sending a single SIGUSR1 at the end of each
751 // run, to ensure we get exactly 4 runs in total.
752 if pulls > 0 && pulls%4 == 0 && pulls <= 12 && pulls/4 > completedRuns {
753 completedRuns = pulls / 4
754 c.Logf("completed run %d, sending SIGUSR1 to trigger next run", completedRuns)
755 procself.Signal(syscall.SIGUSR1)
757 time.Sleep(time.Millisecond)
761 c.Check(pullReqs.Count(), check.Equals, 16)
762 c.Check(trashReqs.Count(), check.Equals, 20)
764 metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
765 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 4\n.*`)