21189: Replace -commit-X with Balance{Pull,Trash}Limit configs.
authorTom Clegg <tom@curii.com>
Wed, 15 Nov 2023 15:00:07 +0000 (10:00 -0500)
committerTom Clegg <tom@curii.com>
Wed, 15 Nov 2023 15:48:40 +0000 (10:48 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

cmd/arvados-server/keep-balance.service
lib/config/config.default.yml
lib/config/export.go
sdk/go/arvados/config.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/balance_test.go
services/keep-balance/change_set.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/server.go

index 6b16b2cfab709d93c028b0f3343a709057dd5b10..c603939bcf62bcb5b897caa94f93de7472a100cf 100644 (file)
@@ -14,7 +14,7 @@ StartLimitIntervalSec=0
 [Service]
 Type=notify
 EnvironmentFile=-/etc/arvados/environment
-ExecStart=/usr/bin/keep-balance -commit-pulls -commit-trash
+ExecStart=/usr/bin/keep-balance
 # Set a reasonable default for the open file limit
 LimitNOFILE=65536
 Restart=always
index 24d626aae50a1a9b46ec3d3dde9b3efcccbbb853..4fe75bd680614a09e0f644e6f14d36aa1715d2a4 100644 (file)
@@ -638,6 +638,15 @@ Clusters:
       # once.
       BalanceUpdateLimit: 100000
 
+      # Maximum number of "pull block from other server" and "trash
+      # block" requests to send to each keepstore server at a
+      # time. Smaller values use less memory in keepstore and
+      # keep-balance. Larger values allow more progress per
+      # keep-balance iteration. Setting both limits to zero amounts
+      # to a "dry run".
+      BalancePullLimit: 100000
+      BalanceTrashLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index cbd9ff6d7f982c2818829b89f7a15fab9b5ca044..674b37473e8b1e99e1ce5f3fd3551e3e8ee22666 100644 (file)
@@ -93,7 +93,9 @@ var whitelist = map[string]bool{
        "Collections.BalanceCollectionBatch":       false,
        "Collections.BalanceCollectionBuffers":     false,
        "Collections.BalancePeriod":                false,
+       "Collections.BalancePullLimit":             false,
        "Collections.BalanceTimeout":               false,
+       "Collections.BalanceTrashLimit":            false,
        "Collections.BalanceUpdateLimit":           false,
        "Collections.BlobDeleteConcurrency":        false,
        "Collections.BlobMissingReport":            false,
index ea01cc3c6872a88adb341d953adbe75760b4f176..e2ad7b089db80a14191b89470985bd2a56759b19 100644 (file)
@@ -150,6 +150,8 @@ type Cluster struct {
                BalanceCollectionBuffers int
                BalanceTimeout           Duration
                BalanceUpdateLimit       int
+               BalancePullLimit         int
+               BalanceTrashLimit        int
 
                WebDAVCache WebDAVCacheConfig
 
index e44dfeda8748aec51e18cd55118c15d509d8fc12..e71eb07efa6979fa005c4a59faf70f7c3187519b 100644 (file)
@@ -137,7 +137,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        client.Timeout = 0
 
        rs := bal.rendezvousState()
-       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+       if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
@@ -155,6 +155,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
+       bal.setupLookupTables(cluster)
        bal.ComputeChangeSets()
        bal.PrintStatistics()
        if err = bal.CheckSanityLate(); err != nil {
@@ -171,14 +172,14 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
                }
                lbFile = nil
        }
-       if runOptions.CommitPulls {
+       if cluster.Collections.BalancePullLimit > 0 {
                err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
-       if runOptions.CommitTrash {
+       if cluster.Collections.BalanceTrashLimit > 0 {
                err = bal.CommitTrash(ctx, client)
                if err != nil {
                        return
@@ -542,7 +543,6 @@ func (bal *Balancer) ComputeChangeSets() {
        // This just calls balanceBlock() once for each block, using a
        // pool of worker goroutines.
        defer bal.time("changeset_compute", "wall clock time to compute changesets")()
-       bal.setupLookupTables()
 
        type balanceTask struct {
                blkid arvados.SizedDigest
@@ -577,7 +577,7 @@ func (bal *Balancer) ComputeChangeSets() {
        bal.collectStatistics(results)
 }
 
-func (bal *Balancer) setupLookupTables() {
+func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) {
        bal.serviceRoots = make(map[string]string)
        bal.classes = defaultClasses
        bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
@@ -609,6 +609,13 @@ func (bal *Balancer) setupLookupTables() {
        // class" case in balanceBlock depends on the order classes
        // are considered.
        sort.Strings(bal.classes)
+
+       for _, srv := range bal.KeepServices {
+               srv.ChangeSet = &ChangeSet{
+                       PullLimit:  cluster.Collections.BalancePullLimit,
+                       TrashLimit: cluster.Collections.BalanceTrashLimit,
+               }
+       }
 }
 
 const (
@@ -957,19 +964,21 @@ type replicationStats struct {
 }
 
 type balancerStats struct {
-       lost          blocksNBytes
-       overrep       blocksNBytes
-       unref         blocksNBytes
-       garbage       blocksNBytes
-       underrep      blocksNBytes
-       unachievable  blocksNBytes
-       justright     blocksNBytes
-       desired       blocksNBytes
-       current       blocksNBytes
-       pulls         int
-       trashes       int
-       replHistogram []int
-       classStats    map[string]replicationStats
+       lost            blocksNBytes
+       overrep         blocksNBytes
+       unref           blocksNBytes
+       garbage         blocksNBytes
+       underrep        blocksNBytes
+       unachievable    blocksNBytes
+       justright       blocksNBytes
+       desired         blocksNBytes
+       current         blocksNBytes
+       pulls           int
+       pullsDeferred   int
+       trashes         int
+       trashesDeferred int
+       replHistogram   []int
+       classStats      map[string]replicationStats
 
        // collectionBytes / collectionBlockBytes = deduplication ratio
        collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
@@ -1092,7 +1101,9 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
+               s.pullsDeferred += srv.ChangeSet.PullsDeferred
                s.trashes += len(srv.ChangeSet.Trashes)
+               s.trashesDeferred += srv.ChangeSet.TrashesDeferred
        }
        bal.stats = s
        bal.Metrics.UpdateStats(s)
index 962bd40adefc8e360be0749ee5a700bcb727821a..b7b3fb61233249beb6c6df48e46c8de4ea678e22 100644 (file)
@@ -396,9 +396,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        _, err := s.db.Exec(`delete from collections`)
        c.Assert(err, check.IsNil)
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveZeroCollections()
@@ -416,8 +414,6 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 
 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
                ChunkPrefix: "abc",
                Logger:      ctxlog.TestLogger(c),
        }
@@ -439,9 +435,7 @@ func (s *runSuite) TestRefuseBadIndex(c *check.C) {
 
 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserNotAdmin()
        s.stub.serveZeroCollections()
@@ -468,8 +462,6 @@ func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
                s.SetUpTest(c)
                c.Logf("trying invalid prefix %q", trial.prefix)
                opts := RunOptions{
-                       CommitPulls: true,
-                       CommitTrash: true,
                        ChunkPrefix: trial.prefix,
                        Logger:      ctxlog.TestLogger(c),
                }
@@ -489,9 +481,7 @@ func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
 
 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveZeroCollections()
@@ -519,9 +509,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        s.config.Collections.BlobMissingReport = lostf.Name()
        defer os.Remove(lostf.Name())
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -540,10 +528,10 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
 }
 
 func (s *runSuite) TestDryRun(c *check.C) {
+       s.config.Collections.BalanceTrashLimit = 0
+       s.config.Collections.BalancePullLimit = 0
        opts := RunOptions{
-               CommitPulls: false,
-               CommitTrash: false,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        collReqs := s.stub.serveFooBarFileCollections()
@@ -561,7 +549,10 @@ func (s *runSuite) TestDryRun(c *check.C) {
        }
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
-       c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
+       c.Check(bal.stats.pulls, check.Equals, 0)
+       c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
+       c.Check(bal.stats.trashes, check.Equals, 0)
+       c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
        c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
        c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
 }
@@ -570,10 +561,8 @@ func (s *runSuite) TestCommit(c *check.C) {
        s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
-               Dumper:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
+               Dumper: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -608,8 +597,6 @@ func (s *runSuite) TestCommit(c *check.C) {
 func (s *runSuite) TestChunkPrefix(c *check.C) {
        s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
                ChunkPrefix: "ac", // catch "foo" but not "bar"
                Logger:      ctxlog.TestLogger(c),
                Dumper:      ctxlog.TestLogger(c),
@@ -639,10 +626,8 @@ func (s *runSuite) TestChunkPrefix(c *check.C) {
 func (s *runSuite) TestRunForever(c *check.C) {
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
-               Dumper:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
+               Dumper: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
index e5bdf9c023d26fd5e631820488ea28f98c3ed65f..85d4ff8b5d9f484746463260eb589109cc06660c 100644 (file)
@@ -12,6 +12,7 @@ import (
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
@@ -26,6 +27,7 @@ var _ = check.Suite(&balancerSuite{})
 
 type balancerSuite struct {
        Balancer
+       config          *arvados.Cluster
        srvs            []*KeepService
        blks            map[string]tester
        knownRendezvous [][]int
@@ -72,6 +74,11 @@ func (bal *balancerSuite) SetUpSuite(c *check.C) {
 
        bal.signatureTTL = 3600
        bal.Logger = ctxlog.TestLogger(c)
+
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.Equals, nil)
+       bal.config, err = cfg.GetCluster("")
+       c.Assert(err, check.Equals, nil)
 }
 
 func (bal *balancerSuite) SetUpTest(c *check.C) {
@@ -743,7 +750,7 @@ func (bal *balancerSuite) TestChangeStorageClasses(c *check.C) {
 // the appropriate changes for that block have been added to the
 // changesets.
 func (bal *balancerSuite) try(c *check.C, t tester) {
-       bal.setupLookupTables()
+       bal.setupLookupTables(bal.config)
        blk := &BlockState{
                Replicas: bal.replList(t.known, t.current),
                Desired:  t.desired,
@@ -751,9 +758,6 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
        for i, t := range t.timestamps {
                blk.Replicas[i].Mtime = t
        }
-       for _, srv := range bal.srvs {
-               srv.ChangeSet = &ChangeSet{}
-       }
        result := bal.balanceBlock(knownBlkid(t.known), blk)
 
        var didPull, didTrash slots
index 8e0ba028acd801e182a9b475f47b658c86e250e1..c3579556bb5f174781753676f0208d794a5ee620 100644 (file)
@@ -60,22 +60,35 @@ func (t Trash) MarshalJSON() ([]byte, error) {
 // ChangeSet is a set of change requests that will be sent to a
 // keepstore server.
 type ChangeSet struct {
-       Pulls   []Pull
-       Trashes []Trash
-       mutex   sync.Mutex
+       PullLimit  int
+       TrashLimit int
+
+       Pulls           []Pull
+       PullsDeferred   int // number that weren't added because of PullLimit
+       Trashes         []Trash
+       TrashesDeferred int // number that weren't added because of TrashLimit
+       mutex           sync.Mutex
 }
 
 // AddPull adds a Pull operation.
 func (cs *ChangeSet) AddPull(p Pull) {
        cs.mutex.Lock()
-       cs.Pulls = append(cs.Pulls, p)
+       if len(cs.Pulls) < cs.PullLimit {
+               cs.Pulls = append(cs.Pulls, p)
+       } else {
+               cs.PullsDeferred++
+       }
        cs.mutex.Unlock()
 }
 
 // AddTrash adds a Trash operation
 func (cs *ChangeSet) AddTrash(t Trash) {
        cs.mutex.Lock()
-       cs.Trashes = append(cs.Trashes, t)
+       if len(cs.Trashes) < cs.TrashLimit {
+               cs.Trashes = append(cs.Trashes, t)
+       } else {
+               cs.TrashesDeferred++
+       }
        cs.mutex.Unlock()
 }
 
@@ -83,5 +96,5 @@ func (cs *ChangeSet) AddTrash(t Trash) {
 func (cs *ChangeSet) String() string {
        cs.mutex.Lock()
        defer cs.mutex.Unlock()
-       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred)
 }
index 42463a002a5ec73652f7f7ef6f00f8a8c4fb44a1..2e353c92be7d5100d2384796fd8a5527b0d46381 100644 (file)
@@ -87,8 +87,6 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                logger := logrus.New()
                logger.Out = io.MultiWriter(&logBuf, os.Stderr)
                opts := RunOptions{
-                       CommitPulls:           true,
-                       CommitTrash:           true,
                        CommitConfirmedFields: true,
                        Logger:                logger,
                }
@@ -101,7 +99,6 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
                c.Check(err, check.IsNil)
                c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
-               c.Check(nextOpts.CommitPulls, check.Equals, true)
                if iter == 0 {
                        c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
                        c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
index 6bc998958979fc41288cd051bb40eaae4a10de4e..f9a3e1701aa7a2ae40dafc926f99a7f3a27107a0 100644 (file)
@@ -32,10 +32,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
-       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
-               "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
-               "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       deprCommitPulls := flags.Bool("commit-pulls", true,
+               "send pull requests (must be true -- configure Collections.BalancePullLimit = 0 to disable.)")
+       deprCommitTrash := flags.Bool("commit-trash", true,
+               "send trash requests (must be true -- configure Collections.BalanceTrashLimit = 0 to disable.)")
        flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
                "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
        flags.StringVar(&options.ChunkPrefix, "chunk-prefix", "",
@@ -55,6 +55,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return code
        }
 
+       if !*deprCommitPulls || !*deprCommitTrash {
+               fmt.Fprint(stderr,
+                       "Usage error: the -commit-pulls or -commit-trash command line flags are no longer supported.\n",
+                       "Use Collections.BalancePullLimit and Collections.BalanceTrashLimit instead.\n")
+               return cmd.EX_USAGE
+       }
+
        // Drop our custom args that would be rejected by the generic
        // service.Command
        args = nil
index 9bcaec43d86aba56190438c02810f19b612d1937..bb7294c4ba1243763a3c774f61cd8953ab49b23b 100644 (file)
@@ -27,8 +27,6 @@ import (
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
        Once                  bool
-       CommitPulls           bool
-       CommitTrash           bool
        CommitConfirmedFields bool
        ChunkPrefix           string
        Logger                logrus.FieldLogger
@@ -112,9 +110,9 @@ func (srv *Server) runForever(ctx context.Context) error {
        logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
-               if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
+               if srv.Cluster.Collections.BalancePullLimit < 1 && srv.Cluster.Collections.BalanceTrashLimit < 1 {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
-                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
+                       logger.Print("=======  Consider using non-zero BalancePullLimit and BalanceTrashLimit configs.")
                }
 
                if !dblock.KeepBalanceService.Check() {