17574: Add BalanceUpdateLimit config, fix tests.
authorTom Clegg <tom@curii.com>
Wed, 28 Jul 2021 04:18:53 +0000 (00:18 -0400)
committerTom Clegg <tom@curii.com>
Wed, 28 Jul 2021 04:18:53 +0000 (00:18 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
sdk/go/arvados/config.go
services/keep-balance/balance_run_test.go
services/keep-balance/collection.go
services/keep-balance/main.go
services/keep-balance/main_test.go

index e28d5cbb7f0cd09b3ad559f6cab0c9a9967c10d5..c5bc1c8e87f6e8c6598eadc93f2f3766d222e703 100644 (file)
@@ -456,6 +456,13 @@ Clusters:
       # long-running balancing operation.
       BalanceTimeout: 6h
 
+      # Maximum number of replication_confirmed /
+      # storage_classes_confirmed updates to write to the database
+      # after a rebalancing run. When many updates are needed, this
+      # spreads them over a few runs rather than applying them all at
+      # once.
+      BalanceUpdateLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index bb939321c9ce17e220bb031f041efae53c79fa46..2faacc85953ffb724015b427f29796cb4634002e 100644 (file)
@@ -84,6 +84,7 @@ var whitelist = map[string]bool{
        "Collections.BalanceCollectionBuffers":                false,
        "Collections.BalancePeriod":                           false,
        "Collections.BalanceTimeout":                          false,
+       "Collections.BalanceUpdateLimit":                      false,
        "Collections.BlobDeleteConcurrency":                   false,
        "Collections.BlobMissingReport":                       false,
        "Collections.BlobReplicateConcurrency":                false,
index b15bf7eebc29facda6f1a0e2670c5482f83055cf..fb9f888ebb87361b071136ab7226665f0a1e51c5 100644 (file)
@@ -462,6 +462,13 @@ Clusters:
       # long-running balancing operation.
       BalanceTimeout: 6h
 
+      # Maximum number of replication_confirmed /
+      # storage_classes_confirmed updates to write to the database
+      # after a rebalancing run. When many updates are needed, this
+      # spreads them over a few runs rather than applying them all at
+      # once.
+      BalanceUpdateLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 6e59828a3cbf5656fef1e6c7fc790ca9d3b6268f..dcffcd25e73051985c1242441cce0372ed0eb7dd 100644 (file)
@@ -138,6 +138,7 @@ type Cluster struct {
                BalanceCollectionBatch   int
                BalanceCollectionBuffers int
                BalanceTimeout           Duration
+               BalanceUpdateLimit       int
 
                WebDAVCache WebDAVCacheConfig
 
index 5e1c0e45c17fc6ec13fa47f69c8c48bf4f3041ef..18a8bdcf47b111b0c6ea6469d58b60d2a10ea5f5 100644 (file)
@@ -352,6 +352,9 @@ func (s *runSuite) TearDownTest(c *check.C) {
 }
 
 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+       defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+       _, err := s.db.Exec(`delete from collections`)
+       c.Assert(err, check.IsNil)
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
@@ -365,7 +368,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       _, err := srv.runOnce()
+       _, err = srv.runOnce()
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -390,26 +393,6 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        c.Check(pullReqs.Count(), check.Equals, 0)
 }
 
-func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
-       opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
-       }
-       s.stub.serveCurrentUserAdmin()
-       s.stub.serveCollectionsButSkipOne()
-       s.stub.serveKeepServices(stubServices)
-       s.stub.serveKeepstoreMounts()
-       s.stub.serveKeepstoreIndexFoo4Bar1()
-       trashReqs := s.stub.serveKeepstoreTrash()
-       pullReqs := s.stub.serveKeepstorePull()
-       srv := s.newServer(&opts)
-       _, err := srv.runOnce()
-       c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
-       c.Check(trashReqs.Count(), check.Equals, 4)
-       c.Check(pullReqs.Count(), check.Equals, 0)
-}
-
 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
        c.Assert(err, check.IsNil)
@@ -433,7 +416,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        c.Check(err, check.IsNil)
        lost, err := ioutil.ReadFile(lostf.Name())
        c.Assert(err, check.IsNil)
-       c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
+       c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
 }
 
 func (s *runSuite) TestDryRun(c *check.C) {
@@ -464,11 +447,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
-       lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
-       c.Assert(err, check.IsNil)
-       s.config.Collections.BlobMissingReport = lostf.Name()
-       defer os.Remove(lostf.Name())
-
+       s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                CommitPulls: true,
@@ -494,17 +473,18 @@ func (s *runSuite) TestCommit(c *check.C) {
        // in a poor rendezvous position
        c.Check(bal.stats.pulls, check.Equals, 2)
 
-       lost, err := ioutil.ReadFile(lostf.Name())
+       lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
        c.Assert(err, check.IsNil)
-       c.Check(string(lost), check.Equals, "")
+       c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
 
        buf, err := s.getMetrics(c, srv)
        c.Check(err, check.IsNil)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+       bufstr := buf.String()
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
index ba9edf9391cddca34645e8e1f5981b896564a62a..6e0a066e869261a35ee60212d77090f1e9432d3c 100644 (file)
@@ -8,7 +8,6 @@ import (
        "context"
        "encoding/json"
        "fmt"
-       "io"
        "runtime"
        "sync"
        "sync/atomic"
@@ -52,6 +51,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
        if err != nil {
                return err
        }
+       progressTicker := time.NewTicker(10 * time.Second)
+       defer progressTicker.Stop()
        callCount := 0
        for rows.Next() {
                var coll arvados.Collection
@@ -74,8 +75,13 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
                if err != nil {
                        return err
                }
-               progress(callCount, expectCount)
+               select {
+               case <-progressTicker.C:
+                       progress(callCount, expectCount)
+               default:
+               }
        }
+       progress(callCount, expectCount)
        rows.Close()
        if err := rows.Err(); err != nil {
                return err
@@ -104,36 +110,37 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
        threshold := time.Now()
        thresholdStr := threshold.Format(time.RFC3339Nano)
 
+       updated := int64(0)
+
        var err error
        collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
        go func() {
                defer close(collQ)
-               err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
-                       if coll.ModifiedAt.After(threshold) {
-                               return io.EOF
-                       }
-                       if coll.IsTrashed {
-                               return nil
+               err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+                       if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+                               bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+                               cancel()
+                               return context.Canceled
                        }
                        collQ <- coll
                        return nil
                }, func(done, total int) {
                        bal.logf("update collections: %d/%d", done, total)
                })
-               if err == io.EOF {
-                       err = nil
-               } else if err != nil {
+               if err != nil && err != context.Canceled {
                        bal.logf("error updating collections: %s", err)
                }
        }()
 
-       var updated int64
        var wg sync.WaitGroup
        for i := 0; i < runtime.NumCPU(); i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()
                        for coll := range collQ {
+                               if ctx.Err() != nil {
+                                       continue
+                               }
                                blkids, err := coll.SizedDigests()
                                if err != nil {
                                        bal.logf("%s: %s", coll.UUID, err)
@@ -153,7 +160,9 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                                        where uuid=$4`,
                                        repl, thresholdStr, classes, coll.UUID)
                                if err != nil {
-                                       bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       if err != context.Canceled {
+                                               bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       }
                                        continue
                                }
                                atomic.AddInt64(&updated, 1)
index 90ae0695df8d96d2d603b916e44d26310ce2da3c..e1573e7f733935028d164d6d5dd69d383fdf338f 100644 (file)
@@ -19,6 +19,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
        "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
index b154f6e99848a3623b167726412ce5b48a59c715..65a2d5567a86505e7d6e4866aa8ffa75e3bf2deb 100644 (file)
@@ -11,6 +11,11 @@ import (
        "net/http"
        "time"
 
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -26,6 +31,8 @@ func (s *mainSuite) TestVersionFlag(c *check.C) {
 }
 
 func (s *mainSuite) TestHTTPServer(c *check.C) {
+       arvadostest.StartKeep(2, true)
+
        ln, err := net.Listen("tcp", ":0")
        if err != nil {
                c.Fatal(err)
@@ -33,10 +40,17 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
        _, p, err := net.SplitHostPort(ln.Addr().String())
        c.Check(err, check.IsNil)
        ln.Close()
-       config := "Clusters:\n zzzzz:\n  ManagementToken: abcdefg\n  Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n"
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{}
+       cfg.Clusters[cluster.ClusterID] = *cluster
+       config, err := yaml.Marshal(cfg)
+       c.Assert(err, check.IsNil)
 
        var stdout bytes.Buffer
-       go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout)
+       go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout)
        done := make(chan struct{})
        go func() {
                defer close(done)
@@ -47,7 +61,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
                                c.Fatal(err)
                                return
                        }
-                       req.Header.Set("Authorization", "Bearer abcdefg")
+                       req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
                        resp, err := http.DefaultClient.Do(req)
                        if err != nil {
                                c.Logf("error %s", err)
@@ -73,6 +87,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
                c.Log(stdout.String())
                c.Fatal("timeout")
        }
+       c.Log(stdout.String())
 
        // Check non-metrics URL that gets passed through to us from
        // service.Command