17574: Add BalanceUpdateLimit config, fix tests.
[arvados.git] / services / keep-balance / collection.go
index ba9edf9391cddca34645e8e1f5981b896564a62a..6e0a066e869261a35ee60212d77090f1e9432d3c 100644 (file)
@@ -8,7 +8,6 @@ import (
        "context"
        "encoding/json"
        "fmt"
-       "io"
        "runtime"
        "sync"
        "sync/atomic"
@@ -52,6 +51,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
        if err != nil {
                return err
        }
+       progressTicker := time.NewTicker(10 * time.Second)
+       defer progressTicker.Stop()
        callCount := 0
        for rows.Next() {
                var coll arvados.Collection
@@ -74,8 +75,13 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
                if err != nil {
                        return err
                }
-               progress(callCount, expectCount)
+               select {
+               case <-progressTicker.C:
+                       progress(callCount, expectCount)
+               default:
+               }
        }
+       progress(callCount, expectCount)
        rows.Close()
        if err := rows.Err(); err != nil {
                return err
@@ -104,36 +110,37 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
        threshold := time.Now()
        thresholdStr := threshold.Format(time.RFC3339Nano)
 
+       updated := int64(0)
+
        var err error
        collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
        go func() {
                defer close(collQ)
-               err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
-                       if coll.ModifiedAt.After(threshold) {
-                               return io.EOF
-                       }
-                       if coll.IsTrashed {
-                               return nil
+               err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+                       if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+                               bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+                               cancel()
+                               return context.Canceled
                        }
                        collQ <- coll
                        return nil
                }, func(done, total int) {
                        bal.logf("update collections: %d/%d", done, total)
                })
-               if err == io.EOF {
-                       err = nil
-               } else if err != nil {
+               if err != nil && err != context.Canceled {
                        bal.logf("error updating collections: %s", err)
                }
        }()
 
-       var updated int64
        var wg sync.WaitGroup
        for i := 0; i < runtime.NumCPU(); i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()
                        for coll := range collQ {
+                               if ctx.Err() != nil {
+                                       continue
+                               }
                                blkids, err := coll.SizedDigests()
                                if err != nil {
                                        bal.logf("%s: %s", coll.UUID, err)
@@ -153,7 +160,9 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                                        where uuid=$4`,
                                        repl, thresholdStr, classes, coll.UUID)
                                if err != nil {
-                                       bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       if err != context.Canceled {
+                                               bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       }
                                        continue
                                }
                                atomic.AddInt64(&updated, 1)