From: Tom Clegg Date: Wed, 28 Jul 2021 04:18:53 +0000 (-0400) Subject: 17574: Add BalanceUpdateLimit config, fix tests. X-Git-Tag: 2.3.0~118^2~9 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/504b1b430076f15d27ff3e8da3e1d3623431aa84 17574: Add BalanceUpdateLimit config, fix tests. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index e28d5cbb7f..c5bc1c8e87 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -456,6 +456,13 @@ Clusters: # long-running balancing operation. BalanceTimeout: 6h + # Maximum number of replication_confirmed / + # storage_classes_confirmed updates to write to the database + # after a rebalancing run. When many updates are needed, this + # spreads them over a few runs rather than applying them all at + # once. + BalanceUpdateLimit: 100000 + # Default lifetime for ephemeral collections: 2 weeks. This must not # be less than BlobSigningTTL. DefaultTrashLifetime: 336h diff --git a/lib/config/export.go b/lib/config/export.go index bb939321c9..2faacc8595 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -84,6 +84,7 @@ var whitelist = map[string]bool{ "Collections.BalanceCollectionBuffers": false, "Collections.BalancePeriod": false, "Collections.BalanceTimeout": false, + "Collections.BalanceUpdateLimit": false, "Collections.BlobDeleteConcurrency": false, "Collections.BlobMissingReport": false, "Collections.BlobReplicateConcurrency": false, diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index b15bf7eebc..fb9f888ebb 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -462,6 +462,13 @@ Clusters: # long-running balancing operation. BalanceTimeout: 6h + # Maximum number of replication_confirmed / + # storage_classes_confirmed updates to write to the database + # after a rebalancing run. When many updates are needed, this + # spreads them over a few runs rather than applying them all at + # once. + BalanceUpdateLimit: 100000 + # Default lifetime for ephemeral collections: 2 weeks. This must not # be less than BlobSigningTTL. DefaultTrashLifetime: 336h diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 6e59828a3c..dcffcd25e7 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -138,6 +138,7 @@ type Cluster struct { BalanceCollectionBatch int BalanceCollectionBuffers int BalanceTimeout Duration + BalanceUpdateLimit int WebDAVCache WebDAVCacheConfig diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go index 5e1c0e45c1..18a8bdcf47 100644 --- a/services/keep-balance/balance_run_test.go +++ b/services/keep-balance/balance_run_test.go @@ -352,6 +352,9 @@ func (s *runSuite) TearDownTest(c *check.C) { } func (s *runSuite) TestRefuseZeroCollections(c *check.C) { + defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil) + _, err := s.db.Exec(`delete from collections`) + c.Assert(err, check.IsNil) opts := RunOptions{ CommitPulls: true, CommitTrash: true, @@ -365,7 +368,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) { trashReqs := s.stub.serveKeepstoreTrash() pullReqs := s.stub.serveKeepstorePull() srv := s.newServer(&opts) - _, err := srv.runOnce() + _, err = srv.runOnce() c.Check(err, check.ErrorMatches, "received zero collections") c.Check(trashReqs.Count(), check.Equals, 4) c.Check(pullReqs.Count(), check.Equals, 0) @@ -390,26 +393,6 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) { c.Check(pullReqs.Count(), check.Equals, 0) } -func (s *runSuite) TestDetectSkippedCollections(c *check.C) { - opts := RunOptions{ - CommitPulls: true, - CommitTrash: true, - Logger: ctxlog.TestLogger(c), - } - s.stub.serveCurrentUserAdmin() - s.stub.serveCollectionsButSkipOne() - s.stub.serveKeepServices(stubServices) - s.stub.serveKeepstoreMounts() - s.stub.serveKeepstoreIndexFoo4Bar1() - trashReqs := s.stub.serveKeepstoreTrash() - pullReqs := s.stub.serveKeepstorePull() - srv := s.newServer(&opts) - _, err := srv.runOnce() - c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`) - c.Check(trashReqs.Count(), check.Equals, 4) - c.Check(pullReqs.Count(), check.Equals, 0) -} - func (s *runSuite) TestWriteLostBlocks(c *check.C) { lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-") c.Assert(err, check.IsNil) @@ -433,7 +416,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) { c.Check(err, check.IsNil) lost, err := ioutil.ReadFile(lostf.Name()) c.Assert(err, check.IsNil) - c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n") + c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`) } func (s *runSuite) TestDryRun(c *check.C) { @@ -464,11 +447,7 @@ func (s *runSuite) TestDryRun(c *check.C) { } func (s *runSuite) TestCommit(c *check.C) { - lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-") - c.Assert(err, check.IsNil) - s.config.Collections.BlobMissingReport = lostf.Name() - defer os.Remove(lostf.Name()) - + s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-" s.config.ManagementToken = "xyzzy" opts := RunOptions{ CommitPulls: true, @@ -494,17 +473,18 @@ func (s *runSuite) TestCommit(c *check.C) { // in a poor rendezvous position c.Check(bal.stats.pulls, check.Equals, 2) - lost, err := ioutil.ReadFile(lostf.Name()) + lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport) c.Assert(err, check.IsNil) - c.Check(string(lost), check.Equals, "") + c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`) buf, err := s.getMetrics(c, srv) c.Check(err, check.IsNil) - c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`) - c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`) - c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`) - c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`) - c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`) + bufstr := buf.String() + c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`) + c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`) + c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`) + c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`) + c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`) } func (s *runSuite) TestRunForever(c *check.C) { diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go index ba9edf9391..6e0a066e86 100644 --- a/services/keep-balance/collection.go +++ b/services/keep-balance/collection.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "fmt" - "io" "runtime" "sync" "sync/atomic" @@ -52,6 +51,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func( if err != nil { return err } + progressTicker := time.NewTicker(10 * time.Second) + defer progressTicker.Stop() callCount := 0 for rows.Next() { var coll arvados.Collection @@ -74,8 +75,13 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func( if err != nil { return err } - progress(callCount, expectCount) + select { + case <-progressTicker.C: + progress(callCount, expectCount) + default: + } } + progress(callCount, expectCount) rows.Close() if err := rows.Err(); err != nil { return err @@ -104,36 +110,37 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c threshold := time.Now() thresholdStr := threshold.Format(time.RFC3339Nano) + updated := int64(0) + var err error collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers) go func() { defer close(collQ) - err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { - if coll.ModifiedAt.After(threshold) { - return io.EOF - } - if coll.IsTrashed { - return nil + err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { + if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) { + bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit) + cancel() + return context.Canceled } collQ <- coll return nil }, func(done, total int) { bal.logf("update collections: %d/%d", done, total) }) - if err == io.EOF { - err = nil - } else if err != nil { + if err != nil && err != context.Canceled { bal.logf("error updating collections: %s", err) } }() - var updated int64 var wg sync.WaitGroup for i := 0; i < runtime.NumCPU(); i++ { wg.Add(1) go func() { defer wg.Done() for coll := range collQ { + if ctx.Err() != nil { + continue + } blkids, err := coll.SizedDigests() if err != nil { bal.logf("%s: %s", coll.UUID, err) @@ -153,7 +160,9 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c where uuid=$4`, repl, thresholdStr, classes, coll.UUID) if err != nil { - bal.logf("%s: update failed: %s", coll.UUID, err) + if err != context.Canceled { + bal.logf("%s: update failed: %s", coll.UUID, err) + } continue } atomic.AddInt64(&updated, 1) diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go index 90ae0695df..e1573e7f73 100644 --- a/services/keep-balance/main.go +++ b/services/keep-balance/main.go @@ -19,6 +19,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go index b154f6e998..65a2d5567a 100644 --- a/services/keep-balance/main_test.go +++ b/services/keep-balance/main_test.go @@ -11,6 +11,11 @@ import ( "net/http" "time" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -26,6 +31,8 @@ func (s *mainSuite) TestVersionFlag(c *check.C) { } func (s *mainSuite) TestHTTPServer(c *check.C) { + arvadostest.StartKeep(2, true) + ln, err := net.Listen("tcp", ":0") if err != nil { c.Fatal(err) @@ -33,10 +40,17 @@ func (s *mainSuite) TestHTTPServer(c *check.C) { _, p, err := net.SplitHostPort(ln.Addr().String()) c.Check(err, check.IsNil) ln.Close() - config := "Clusters:\n zzzzz:\n ManagementToken: abcdefg\n Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n" + cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() + c.Assert(err, check.IsNil) + cluster, err := cfg.GetCluster("") + c.Assert(err, check.IsNil) + cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{} + cfg.Clusters[cluster.ClusterID] = *cluster + config, err := yaml.Marshal(cfg) + c.Assert(err, check.IsNil) var stdout bytes.Buffer - go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout) + go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout) done := make(chan struct{}) go func() { defer close(done) @@ -47,7 +61,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) { c.Fatal(err) return } - req.Header.Set("Authorization", "Bearer abcdefg") + req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken) resp, err := http.DefaultClient.Do(req) if err != nil { c.Logf("error %s", err) @@ -73,6 +87,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) { c.Log(stdout.String()) c.Fatal("timeout") } + c.Log(stdout.String()) // Check non-metrics URL that gets passed through to us from // service.Command