21189: Replace -commit-X with Balance{Pull,Trash}Limit configs.
[arvados.git] / services / keep-balance / balance.go
index e44dfeda8748aec51e18cd55118c15d509d8fc12..e71eb07efa6979fa005c4a59faf70f7c3187519b 100644 (file)
@@ -137,7 +137,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        client.Timeout = 0
 
        rs := bal.rendezvousState()
-       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+       if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
@@ -155,6 +155,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
+       bal.setupLookupTables(cluster)
        bal.ComputeChangeSets()
        bal.PrintStatistics()
        if err = bal.CheckSanityLate(); err != nil {
@@ -171,14 +172,14 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
                }
                lbFile = nil
        }
-       if runOptions.CommitPulls {
+       if cluster.Collections.BalancePullLimit > 0 {
                err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
-       if runOptions.CommitTrash {
+       if cluster.Collections.BalanceTrashLimit > 0 {
                err = bal.CommitTrash(ctx, client)
                if err != nil {
                        return
@@ -542,7 +543,6 @@ func (bal *Balancer) ComputeChangeSets() {
        // This just calls balanceBlock() once for each block, using a
        // pool of worker goroutines.
        defer bal.time("changeset_compute", "wall clock time to compute changesets")()
-       bal.setupLookupTables()
 
        type balanceTask struct {
                blkid arvados.SizedDigest
@@ -577,7 +577,7 @@ func (bal *Balancer) ComputeChangeSets() {
        bal.collectStatistics(results)
 }
 
-func (bal *Balancer) setupLookupTables() {
+func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) {
        bal.serviceRoots = make(map[string]string)
        bal.classes = defaultClasses
        bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
@@ -609,6 +609,13 @@ func (bal *Balancer) setupLookupTables() {
        // class" case in balanceBlock depends on the order classes
        // are considered.
        sort.Strings(bal.classes)
+
+       for _, srv := range bal.KeepServices {
+               srv.ChangeSet = &ChangeSet{
+                       PullLimit:  cluster.Collections.BalancePullLimit,
+                       TrashLimit: cluster.Collections.BalanceTrashLimit,
+               }
+       }
 }
 
 const (
@@ -957,19 +964,21 @@ type replicationStats struct {
 }
 
 type balancerStats struct {
-       lost          blocksNBytes
-       overrep       blocksNBytes
-       unref         blocksNBytes
-       garbage       blocksNBytes
-       underrep      blocksNBytes
-       unachievable  blocksNBytes
-       justright     blocksNBytes
-       desired       blocksNBytes
-       current       blocksNBytes
-       pulls         int
-       trashes       int
-       replHistogram []int
-       classStats    map[string]replicationStats
+       lost            blocksNBytes
+       overrep         blocksNBytes
+       unref           blocksNBytes
+       garbage         blocksNBytes
+       underrep        blocksNBytes
+       unachievable    blocksNBytes
+       justright       blocksNBytes
+       desired         blocksNBytes
+       current         blocksNBytes
+       pulls           int
+       pullsDeferred   int
+       trashes         int
+       trashesDeferred int
+       replHistogram   []int
+       classStats      map[string]replicationStats
 
        // collectionBytes / collectionBlockBytes = deduplication ratio
        collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
@@ -1092,7 +1101,9 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
+               s.pullsDeferred += srv.ChangeSet.PullsDeferred
                s.trashes += len(srv.ChangeSet.Trashes)
+               s.trashesDeferred += srv.ChangeSet.TrashesDeferred
        }
        bal.stats = s
        bal.Metrics.UpdateStats(s)