Merge branch '20690-remove-wb1'
authorTom Clegg <tom@curii.com>
Tue, 21 Nov 2023 16:42:06 +0000 (11:42 -0500)
committerTom Clegg <tom@curii.com>
Tue, 21 Nov 2023 16:42:06 +0000 (11:42 -0500)
refs #20690

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

27 files changed:
cmd/arvados-client/cmd_test.go
cmd/arvados-server/arvados-controller.service
cmd/arvados-server/arvados-dispatch-cloud.service
cmd/arvados-server/arvados-dispatch-lsf.service
cmd/arvados-server/arvados-git-httpd.service
cmd/arvados-server/arvados-health.service
cmd/arvados-server/arvados-ws.service
cmd/arvados-server/crunch-dispatch-slurm.service
cmd/arvados-server/keep-balance.service
cmd/arvados-server/keep-web.service
cmd/arvados-server/keepproxy.service
cmd/arvados-server/keepstore.service
doc/Gemfile.lock
lib/cli/get.go
lib/cmd/cmd.go
lib/cmd/parseflags.go
lib/config/cmd_test.go
lib/config/config.default.yml
lib/config/export.go
sdk/go/arvados/config.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/balance_test.go
services/keep-balance/change_set.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/server.go

index cbbc7b1f9505cf58515bfdade093298cb8a182da..911375c655e0d295957903793a9b98f4943d4d8c 100644 (file)
@@ -9,6 +9,7 @@ import (
        "io/ioutil"
        "testing"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        check "gopkg.in/check.v1"
 )
 
@@ -23,12 +24,12 @@ type ClientSuite struct{}
 
 func (s *ClientSuite) TestBadCommand(c *check.C) {
        exited := handler.RunCommand("arvados-client", []string{"no such command"}, bytes.NewReader(nil), ioutil.Discard, ioutil.Discard)
-       c.Check(exited, check.Equals, 2)
+       c.Check(exited, check.Equals, cmd.EXIT_INVALIDARGUMENT)
 }
 
 func (s *ClientSuite) TestBadSubcommandArgs(c *check.C) {
        exited := handler.RunCommand("arvados-client", []string{"get"}, bytes.NewReader(nil), ioutil.Discard, ioutil.Discard)
-       c.Check(exited, check.Equals, 2)
+       c.Check(exited, check.Equals, cmd.EXIT_INVALIDARGUMENT)
 }
 
 func (s *ClientSuite) TestVersion(c *check.C) {
index 420cbb035a7e7177f84ef7a9ca07117d70e37e5f..f96532de5ef30d167944dfc23b958a16e26bcce9 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/arvados-controller
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 8d57e8a1612ca49ecc9d98b44a716eb1485e2640..11887b8f8c5ca99324cd4c82075d83a11325042c 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/arvados-dispatch-cloud
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 65d8786670af43a4e1a8ce610fdd3babf5af1270..f90cd9033d4b444932519cb8c230f242eaea5c1c 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/arvados-dispatch-lsf
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index b45587ffc06bd683467b8a5f3de086a08e5598b9..6e5b0dc8e284d64ff5bf2b50bfceded013dfbcd0 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/arvados-git-httpd
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index cf246b0ee2a13a0fbd830a47314e1203067af822..ef145e26ebcb1989dc3c3f60d6e7a7e69f8cb0b5 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/arvados-health
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index f73db5d08032369c619e42429ddf7a68550b8551..2e884495998280936a4dc4375bcac6d9d26006ae 100644 (file)
@@ -18,6 +18,7 @@ ExecStart=/usr/bin/arvados-ws
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 51b4e58c35b77ce1f391be6cea43f46d4961cd07..d2a2fb39d9dca39a3df99f57989ae9d7db1c4fbf 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/crunch-dispatch-slurm
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 1c5808288b38a4e0a1b5b4706cfa446f6a224841..f282f0a65021ad78e7f628ee3d94fef976231836 100644 (file)
@@ -14,12 +14,13 @@ StartLimitIntervalSec=0
 [Service]
 Type=notify
 EnvironmentFile=-/etc/arvados/environment
-ExecStart=/usr/bin/keep-balance -commit-pulls -commit-trash
+ExecStart=/usr/bin/keep-balance
 # Set a reasonable default for the open file limit
 LimitNOFILE=65536
 Restart=always
 RestartSec=10s
 Nice=19
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index c0e193d6d812d1c367160aa5b25bd29dced9185e..4ecd0b49782561da91c49f0a3dea38e9306a2b7b 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/keep-web
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 7d4d0926775286411b1176047913c452431f1cf6..139df1c3fade823cf8950e8b8e7d3c4678a80c19 100644 (file)
@@ -19,6 +19,7 @@ ExecStart=/usr/bin/keepproxy
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index bcfde3a7881f0c9d7a3217236d773e7845b2458c..de0fd1dbd7e989a9ad7e33e93ee5149c034b4217 100644 (file)
@@ -23,6 +23,7 @@ ExecStart=/usr/bin/keepstore
 LimitNOFILE=65536
 Restart=always
 RestartSec=1
+RestartPreventExitStatus=2
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 420e13146f5b3228ccbe2fc6cf608fd062cd3a6f..561547d6f7b46ad8199c1090d9b89c15dc6e6167 100644 (file)
@@ -1,20 +1,29 @@
 GEM
   remote: https://rubygems.org/
   specs:
-    RedCloth (4.3.2)
+    RedCloth (4.3.3)
     coderay (1.1.3)
-    colorize (1.0.5)
+    colorize (1.1.0)
     commonjs (0.2.7)
-    kramdown (1.17.0)
+    kramdown (2.4.0)
+      rexml
+    kramdown-parser-gfm (1.1.0)
+      kramdown (~> 2.0)
+    kramdown-syntax-coderay (1.0.1)
+      coderay (~> 1.1)
+      kramdown (~> 2.0)
     less (2.6.0)
       commonjs (~> 0.2.7)
     liquid (4.0.4)
     makerakeworkwell (1.0.4)
       rake (>= 0.9.2, < 15)
-    rake (13.0.6)
-    zenweb (3.10.6)
+    rake (13.1.0)
+    rexml (3.2.6)
+    zenweb (3.11.0)
       coderay (~> 1.0)
-      kramdown (~> 1.4)
+      kramdown (~> 2.0)
+      kramdown-parser-gfm (~> 1.0)
+      kramdown-syntax-coderay (~> 1.0)
       less (~> 2.0)
       makerakeworkwell (~> 1.0)
       rake (>= 0.9, < 15)
index 9625214e22ebd1c805fe0ae21b04c47e2304aece..352e7b9af61ea2a51a12656ae26141335b1e64c2 100644 (file)
@@ -30,12 +30,12 @@ func (getCmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, st
        flags.SetOutput(stderr)
        err = flags.Parse(args)
        if err != nil {
-               return 2
+               return cmd.EXIT_INVALIDARGUMENT
        }
        if len(flags.Args()) != 1 {
                fmt.Fprintf(stderr, "usage of %s:\n", prog)
                flags.PrintDefaults()
-               return 2
+               return cmd.EXIT_INVALIDARGUMENT
        }
        if opts.Short {
                opts.Format = "uuid"
index 2b08ab4822b1d0e7fbd08055db415db56642c685..40e80f5eaab74df0f5370f50c12d1d2868e0e6f9 100644 (file)
@@ -21,6 +21,8 @@ import (
        "github.com/sirupsen/logrus"
 )
 
+const EXIT_INVALIDARGUMENT = 2
+
 type Handler interface {
        RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int
 }
@@ -104,13 +106,13 @@ func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        } else if len(args) < 1 {
                fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
                m.Usage(stderr)
-               return 2
+               return EXIT_INVALIDARGUMENT
        } else if cmd, ok = m[args[0]]; ok {
                return cmd.RunCommand(prog+" "+args[0], args[1:], stdin, stdout, stderr)
        } else {
                fmt.Fprintf(stderr, "%s: unrecognized command %q\n", prog, args[0])
                m.Usage(stderr)
-               return 2
+               return EXIT_INVALIDARGUMENT
        }
 }
 
index 707cacbf524613e007ab5a6b9bd40abb1eaf5248..275e063f3118b17290ee64582b9bf976f1555166 100644 (file)
@@ -35,7 +35,7 @@ func ParseFlags(f FlagSet, prog string, args []string, positional string, stderr
        case nil:
                if f.NArg() > 0 && positional == "" {
                        fmt.Fprintf(stderr, "unrecognized command line arguments: %v (try -help)\n", f.Args())
-                       return false, 2
+                       return false, EXIT_INVALIDARGUMENT
                }
                return true, 0
        case flag.ErrHelp:
@@ -55,6 +55,6 @@ func ParseFlags(f FlagSet, prog string, args []string, positional string, stderr
                return false, 0
        default:
                fmt.Fprintf(stderr, "error parsing command line arguments: %s (try -help)\n", err)
-               return false, 2
+               return false, EXIT_INVALIDARGUMENT
        }
 }
index d94eea89dfbc7ee6562ebd3452b2d879e031c1ae..c2854895cadf0b689b545103937d545c570db426 100644 (file)
@@ -33,7 +33,7 @@ func (s *CommandSuite) SetUpSuite(c *check.C) {
 func (s *CommandSuite) TestDump_BadArg(c *check.C) {
        var stderr bytes.Buffer
        code := DumpCommand.RunCommand("arvados config-dump", []string{"-badarg"}, bytes.NewBuffer(nil), bytes.NewBuffer(nil), &stderr)
-       c.Check(code, check.Equals, 2)
+       c.Check(code, check.Equals, cmd.EXIT_INVALIDARGUMENT)
        c.Check(stderr.String(), check.Equals, "error parsing command line arguments: flag provided but not defined: -badarg (try -help)\n")
 }
 
index 79889565a06f7273b20d69406fcf3194077481fe..05bc1309cdde1ef269306fb634dbe4e6595914ee 100644 (file)
@@ -638,6 +638,15 @@ Clusters:
       # once.
       BalanceUpdateLimit: 100000
 
+      # Maximum number of "pull block from other server" and "trash
+      # block" requests to send to each keepstore server at a
+      # time. Smaller values use less memory in keepstore and
+      # keep-balance. Larger values allow more progress per
+      # keep-balance iteration. A zero value computes all of the
+      # needed changes but does not apply any.
+      BalancePullLimit: 100000
+      BalanceTrashLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 20dd9643b02f5afc91932eb5458e7277a75ed10a..e51e6fc32cdeb03909f84eb3f24cbfcb8351b31a 100644 (file)
@@ -93,7 +93,9 @@ var whitelist = map[string]bool{
        "Collections.BalanceCollectionBatch":       false,
        "Collections.BalanceCollectionBuffers":     false,
        "Collections.BalancePeriod":                false,
+       "Collections.BalancePullLimit":             false,
        "Collections.BalanceTimeout":               false,
+       "Collections.BalanceTrashLimit":            false,
        "Collections.BalanceUpdateLimit":           false,
        "Collections.BlobDeleteConcurrency":        false,
        "Collections.BlobMissingReport":            false,
index 7b9bd847d1f50e9a930f496c108f13e8a12b07e3..6301ed047a1dbfca82b3c717926a2f05415aa291 100644 (file)
@@ -150,6 +150,8 @@ type Cluster struct {
                BalanceCollectionBuffers int
                BalanceTimeout           Duration
                BalanceUpdateLimit       int
+               BalancePullLimit         int
+               BalanceTrashLimit        int
 
                WebDAVCache WebDAVCacheConfig
 
index e44dfeda8748aec51e18cd55118c15d509d8fc12..e71eb07efa6979fa005c4a59faf70f7c3187519b 100644 (file)
@@ -137,7 +137,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        client.Timeout = 0
 
        rs := bal.rendezvousState()
-       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+       if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
@@ -155,6 +155,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
+       bal.setupLookupTables(cluster)
        bal.ComputeChangeSets()
        bal.PrintStatistics()
        if err = bal.CheckSanityLate(); err != nil {
@@ -171,14 +172,14 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
                }
                lbFile = nil
        }
-       if runOptions.CommitPulls {
+       if cluster.Collections.BalancePullLimit > 0 {
                err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
-       if runOptions.CommitTrash {
+       if cluster.Collections.BalanceTrashLimit > 0 {
                err = bal.CommitTrash(ctx, client)
                if err != nil {
                        return
@@ -542,7 +543,6 @@ func (bal *Balancer) ComputeChangeSets() {
        // This just calls balanceBlock() once for each block, using a
        // pool of worker goroutines.
        defer bal.time("changeset_compute", "wall clock time to compute changesets")()
-       bal.setupLookupTables()
 
        type balanceTask struct {
                blkid arvados.SizedDigest
@@ -577,7 +577,7 @@ func (bal *Balancer) ComputeChangeSets() {
        bal.collectStatistics(results)
 }
 
-func (bal *Balancer) setupLookupTables() {
+func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) {
        bal.serviceRoots = make(map[string]string)
        bal.classes = defaultClasses
        bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
@@ -609,6 +609,13 @@ func (bal *Balancer) setupLookupTables() {
        // class" case in balanceBlock depends on the order classes
        // are considered.
        sort.Strings(bal.classes)
+
+       for _, srv := range bal.KeepServices {
+               srv.ChangeSet = &ChangeSet{
+                       PullLimit:  cluster.Collections.BalancePullLimit,
+                       TrashLimit: cluster.Collections.BalanceTrashLimit,
+               }
+       }
 }
 
 const (
@@ -957,19 +964,21 @@ type replicationStats struct {
 }
 
 type balancerStats struct {
-       lost          blocksNBytes
-       overrep       blocksNBytes
-       unref         blocksNBytes
-       garbage       blocksNBytes
-       underrep      blocksNBytes
-       unachievable  blocksNBytes
-       justright     blocksNBytes
-       desired       blocksNBytes
-       current       blocksNBytes
-       pulls         int
-       trashes       int
-       replHistogram []int
-       classStats    map[string]replicationStats
+       lost            blocksNBytes
+       overrep         blocksNBytes
+       unref           blocksNBytes
+       garbage         blocksNBytes
+       underrep        blocksNBytes
+       unachievable    blocksNBytes
+       justright       blocksNBytes
+       desired         blocksNBytes
+       current         blocksNBytes
+       pulls           int
+       pullsDeferred   int
+       trashes         int
+       trashesDeferred int
+       replHistogram   []int
+       classStats      map[string]replicationStats
 
        // collectionBytes / collectionBlockBytes = deduplication ratio
        collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
@@ -1092,7 +1101,9 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
+               s.pullsDeferred += srv.ChangeSet.PullsDeferred
                s.trashes += len(srv.ChangeSet.Trashes)
+               s.trashesDeferred += srv.ChangeSet.TrashesDeferred
        }
        bal.stats = s
        bal.Metrics.UpdateStats(s)
index 962bd40adefc8e360be0749ee5a700bcb727821a..b7b3fb61233249beb6c6df48e46c8de4ea678e22 100644 (file)
@@ -396,9 +396,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        _, err := s.db.Exec(`delete from collections`)
        c.Assert(err, check.IsNil)
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveZeroCollections()
@@ -416,8 +414,6 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 
 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
                ChunkPrefix: "abc",
                Logger:      ctxlog.TestLogger(c),
        }
@@ -439,9 +435,7 @@ func (s *runSuite) TestRefuseBadIndex(c *check.C) {
 
 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserNotAdmin()
        s.stub.serveZeroCollections()
@@ -468,8 +462,6 @@ func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
                s.SetUpTest(c)
                c.Logf("trying invalid prefix %q", trial.prefix)
                opts := RunOptions{
-                       CommitPulls: true,
-                       CommitTrash: true,
                        ChunkPrefix: trial.prefix,
                        Logger:      ctxlog.TestLogger(c),
                }
@@ -489,9 +481,7 @@ func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
 
 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveZeroCollections()
@@ -519,9 +509,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        s.config.Collections.BlobMissingReport = lostf.Name()
        defer os.Remove(lostf.Name())
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -540,10 +528,10 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
 }
 
 func (s *runSuite) TestDryRun(c *check.C) {
+       s.config.Collections.BalanceTrashLimit = 0
+       s.config.Collections.BalancePullLimit = 0
        opts := RunOptions{
-               CommitPulls: false,
-               CommitTrash: false,
-               Logger:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        collReqs := s.stub.serveFooBarFileCollections()
@@ -561,7 +549,10 @@ func (s *runSuite) TestDryRun(c *check.C) {
        }
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
-       c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
+       c.Check(bal.stats.pulls, check.Equals, 0)
+       c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
+       c.Check(bal.stats.trashes, check.Equals, 0)
+       c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
        c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
        c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
 }
@@ -570,10 +561,8 @@ func (s *runSuite) TestCommit(c *check.C) {
        s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
-               Dumper:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
+               Dumper: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -608,8 +597,6 @@ func (s *runSuite) TestCommit(c *check.C) {
 func (s *runSuite) TestChunkPrefix(c *check.C) {
        s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
                ChunkPrefix: "ac", // catch "foo" but not "bar"
                Logger:      ctxlog.TestLogger(c),
                Dumper:      ctxlog.TestLogger(c),
@@ -639,10 +626,8 @@ func (s *runSuite) TestChunkPrefix(c *check.C) {
 func (s *runSuite) TestRunForever(c *check.C) {
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
-               Dumper:      ctxlog.TestLogger(c),
+               Logger: ctxlog.TestLogger(c),
+               Dumper: ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
index e5bdf9c023d26fd5e631820488ea28f98c3ed65f..85d4ff8b5d9f484746463260eb589109cc06660c 100644 (file)
@@ -12,6 +12,7 @@ import (
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
@@ -26,6 +27,7 @@ var _ = check.Suite(&balancerSuite{})
 
 type balancerSuite struct {
        Balancer
+       config          *arvados.Cluster
        srvs            []*KeepService
        blks            map[string]tester
        knownRendezvous [][]int
@@ -72,6 +74,11 @@ func (bal *balancerSuite) SetUpSuite(c *check.C) {
 
        bal.signatureTTL = 3600
        bal.Logger = ctxlog.TestLogger(c)
+
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.Equals, nil)
+       bal.config, err = cfg.GetCluster("")
+       c.Assert(err, check.Equals, nil)
 }
 
 func (bal *balancerSuite) SetUpTest(c *check.C) {
@@ -743,7 +750,7 @@ func (bal *balancerSuite) TestChangeStorageClasses(c *check.C) {
 // the appropriate changes for that block have been added to the
 // changesets.
 func (bal *balancerSuite) try(c *check.C, t tester) {
-       bal.setupLookupTables()
+       bal.setupLookupTables(bal.config)
        blk := &BlockState{
                Replicas: bal.replList(t.known, t.current),
                Desired:  t.desired,
@@ -751,9 +758,6 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
        for i, t := range t.timestamps {
                blk.Replicas[i].Mtime = t
        }
-       for _, srv := range bal.srvs {
-               srv.ChangeSet = &ChangeSet{}
-       }
        result := bal.balanceBlock(knownBlkid(t.known), blk)
 
        var didPull, didTrash slots
index 8e0ba028acd801e182a9b475f47b658c86e250e1..c3579556bb5f174781753676f0208d794a5ee620 100644 (file)
@@ -60,22 +60,35 @@ func (t Trash) MarshalJSON() ([]byte, error) {
 // ChangeSet is a set of change requests that will be sent to a
 // keepstore server.
 type ChangeSet struct {
-       Pulls   []Pull
-       Trashes []Trash
-       mutex   sync.Mutex
+       PullLimit  int
+       TrashLimit int
+
+       Pulls           []Pull
+       PullsDeferred   int // number that weren't added because of PullLimit
+       Trashes         []Trash
+       TrashesDeferred int // number that weren't added because of TrashLimit
+       mutex           sync.Mutex
 }
 
 // AddPull adds a Pull operation.
 func (cs *ChangeSet) AddPull(p Pull) {
        cs.mutex.Lock()
-       cs.Pulls = append(cs.Pulls, p)
+       if len(cs.Pulls) < cs.PullLimit {
+               cs.Pulls = append(cs.Pulls, p)
+       } else {
+               cs.PullsDeferred++
+       }
        cs.mutex.Unlock()
 }
 
 // AddTrash adds a Trash operation
 func (cs *ChangeSet) AddTrash(t Trash) {
        cs.mutex.Lock()
-       cs.Trashes = append(cs.Trashes, t)
+       if len(cs.Trashes) < cs.TrashLimit {
+               cs.Trashes = append(cs.Trashes, t)
+       } else {
+               cs.TrashesDeferred++
+       }
        cs.mutex.Unlock()
 }
 
@@ -83,5 +96,5 @@ func (cs *ChangeSet) AddTrash(t Trash) {
 func (cs *ChangeSet) String() string {
        cs.mutex.Lock()
        defer cs.mutex.Unlock()
-       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d} Deferred{Pulls:%d Trashes:%d}", len(cs.Pulls), len(cs.Trashes), cs.PullsDeferred, cs.TrashesDeferred)
 }
index 42463a002a5ec73652f7f7ef6f00f8a8c4fb44a1..2e353c92be7d5100d2384796fd8a5527b0d46381 100644 (file)
@@ -87,8 +87,6 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                logger := logrus.New()
                logger.Out = io.MultiWriter(&logBuf, os.Stderr)
                opts := RunOptions{
-                       CommitPulls:           true,
-                       CommitTrash:           true,
                        CommitConfirmedFields: true,
                        Logger:                logger,
                }
@@ -101,7 +99,6 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
                c.Check(err, check.IsNil)
                c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
-               c.Check(nextOpts.CommitPulls, check.Equals, true)
                if iter == 0 {
                        c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
                        c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
index 6bc998958979fc41288cd051bb40eaae4a10de4e..ec1cb18ee1087063a57d63f73c2c00b5a17eab33 100644 (file)
@@ -32,10 +32,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
-       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
-               "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
-               "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       deprCommitPulls := flags.Bool("commit-pulls", true,
+               "send pull requests (must be true -- configure Collections.BalancePullLimit = 0 to disable.)")
+       deprCommitTrash := flags.Bool("commit-trash", true,
+               "send trash requests (must be true -- configure Collections.BalanceTrashLimit = 0 to disable.)")
        flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
                "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
        flags.StringVar(&options.ChunkPrefix, "chunk-prefix", "",
@@ -55,6 +55,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return code
        }
 
+       if !*deprCommitPulls || !*deprCommitTrash {
+               fmt.Fprint(stderr,
+                       "Usage error: the -commit-pulls or -commit-trash command line flags are no longer supported.\n",
+                       "Use Collections.BalancePullLimit and Collections.BalanceTrashLimit instead.\n")
+               return cmd.EXIT_INVALIDARGUMENT
+       }
+
        // Drop our custom args that would be rejected by the generic
        // service.Command
        args = nil
index 9bcaec43d86aba56190438c02810f19b612d1937..b20144e3af9cae063f50f3ac5246e6f038ccbd2f 100644 (file)
@@ -27,8 +27,6 @@ import (
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
        Once                  bool
-       CommitPulls           bool
-       CommitTrash           bool
        CommitConfirmedFields bool
        ChunkPrefix           string
        Logger                logrus.FieldLogger
@@ -112,9 +110,9 @@ func (srv *Server) runForever(ctx context.Context) error {
        logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
-               if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
+               if srv.Cluster.Collections.BalancePullLimit < 1 && srv.Cluster.Collections.BalanceTrashLimit < 1 {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
-                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
+                       logger.Print("=======  To commit changes, set BalancePullLimit and BalanceTrashLimit values greater than zero.")
                }
 
                if !dblock.KeepBalanceService.Check() {