17574: Batch updates into transactions, skip when unchanged.
authorTom Clegg <tom@curii.com>
Wed, 28 Jul 2021 20:11:52 +0000 (16:11 -0400)
committerTom Clegg <tom@curii.com>
Wed, 28 Jul 2021 20:11:52 +0000 (16:11 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-balance/collection.go

index 6e0a066e869261a35ee60212d77090f1e9432d3c..daedeb8bfcb82b94fbf3007fd1f1a13fe19df432 100644 (file)
@@ -47,24 +47,36 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
        }
        var newestModifiedAt time.Time
 
        }
        var newestModifiedAt time.Time
 
-       rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+       rows, err := db.QueryxContext(ctx, `SELECT
+               uuid, manifest_text, modified_at, portable_data_hash,
+               replication_desired, replication_confirmed, replication_confirmed_at,
+               storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+               is_trashed
+               FROM collections`)
        if err != nil {
                return err
        }
        if err != nil {
                return err
        }
+       defer rows.Close()
        progressTicker := time.NewTicker(10 * time.Second)
        defer progressTicker.Stop()
        callCount := 0
        for rows.Next() {
                var coll arvados.Collection
        progressTicker := time.NewTicker(10 * time.Second)
        defer progressTicker.Stop()
        callCount := 0
        for rows.Next() {
                var coll arvados.Collection
-               var classesDesired []byte
-               err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
+               var classesDesired, classesConfirmed []byte
+               err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+                       &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+                       &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+                       &coll.IsTrashed)
                if err != nil {
                if err != nil {
-                       rows.Close()
                        return err
                }
                        return err
                }
+
                err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
                err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
-               if err != nil {
-                       rows.Close()
+               if err != nil && len(classesDesired) > 0 {
+                       return err
+               }
+               err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+               if err != nil && len(classesConfirmed) > 0 {
                        return err
                }
                if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
                        return err
                }
                if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
@@ -82,8 +94,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
                }
        }
        progress(callCount, expectCount)
                }
        }
        progress(callCount, expectCount)
-       rows.Close()
-       if err := rows.Err(); err != nil {
+       err = rows.Close()
+       if err != nil {
                return err
        }
        if checkCount, err := countCollections(c, arvados.ResourceListParams{
                return err
        }
        if checkCount, err := countCollections(c, arvados.ResourceListParams{
@@ -112,7 +124,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 
        updated := int64(0)
 
 
        updated := int64(0)
 
-       var err error
+       errs := make(chan error, 1)
        collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
        go func() {
                defer close(collQ)
        collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
        go func() {
                defer close(collQ)
@@ -125,20 +137,46 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                        collQ <- coll
                        return nil
                }, func(done, total int) {
                        collQ <- coll
                        return nil
                }, func(done, total int) {
-                       bal.logf("update collections: %d/%d", done, total)
+                       bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
                })
                if err != nil && err != context.Canceled {
                })
                if err != nil && err != context.Canceled {
-                       bal.logf("error updating collections: %s", err)
+                       select {
+                       case errs <- err:
+                       default:
+                       }
                }
        }()
 
        var wg sync.WaitGroup
                }
        }()
 
        var wg sync.WaitGroup
-       for i := 0; i < runtime.NumCPU(); i++ {
+
+       // Use about 1 goroutine per 2 CPUs. Based on experiments with
+       // a 2-core host, using more concurrent database
+       // calls/transactions makes this process slower, not faster.
+       for i := 0; i < runtime.NumCPU()+1/2; i++ {
                wg.Add(1)
                wg.Add(1)
-               go func() {
+               goSendErr(errs, func() error {
                        defer wg.Done()
                        defer wg.Done()
+                       tx, err := bal.DB.Beginx()
+                       if err != nil {
+                               return err
+                       }
+                       txPending := 0
+                       flush := func(final bool) error {
+                               err := tx.Commit()
+                               if err != nil {
+                                       tx.Rollback()
+                                       return err
+                               }
+                               txPending = 0
+                               if final {
+                                       return nil
+                               }
+                               tx, err = bal.DB.Beginx()
+                               return err
+                       }
+                       txBatch := 100
                        for coll := range collQ {
                        for coll := range collQ {
-                               if ctx.Err() != nil {
+                               if ctx.Err() != nil || len(errs) > 0 {
                                        continue
                                }
                                blkids, err := coll.SizedDigests()
                                        continue
                                }
                                blkids, err := coll.SizedDigests()
@@ -147,12 +185,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                                        continue
                                }
                                repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
                                        continue
                                }
                                repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+                               desired := bal.DefaultReplication
+                               if coll.ReplicationDesired != nil {
+                                       desired = *coll.ReplicationDesired
+                               }
+                               if repl > desired {
+                                       // If actual>desired, confirm
+                                       // the desired number rather
+                                       // than actual to avoid
+                                       // flapping updates when
+                                       // replication increases
+                                       // temporarily.
+                                       repl = desired
+                               }
                                classes, err := json.Marshal(coll.StorageClassesDesired)
                                if err != nil {
                                        bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
                                        continue
                                }
                                classes, err := json.Marshal(coll.StorageClassesDesired)
                                if err != nil {
                                        bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
                                        continue
                                }
-                               _, err = bal.DB.ExecContext(ctx, `update collections set
+                               needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+                               for i := range coll.StorageClassesDesired {
+                                       if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+                                               needUpdate = true
+                                       }
+                               }
+                               if !needUpdate {
+                                       continue
+                               }
+                               _, err = tx.ExecContext(ctx, `update collections set
                                        replication_confirmed=$1,
                                        replication_confirmed_at=$2,
                                        storage_classes_confirmed=$3,
                                        replication_confirmed=$1,
                                        replication_confirmed_at=$2,
                                        storage_classes_confirmed=$3,
@@ -166,10 +227,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                                        continue
                                }
                                atomic.AddInt64(&updated, 1)
                                        continue
                                }
                                atomic.AddInt64(&updated, 1)
+                               if txPending++; txPending >= txBatch {
+                                       err = flush(false)
+                                       if err != nil {
+                                               return err
+                                       }
+                               }
                        }
                        }
-               }()
+                       return flush(true)
+               })
        }
        wg.Wait()
        bal.logf("updated %d collections", updated)
        }
        wg.Wait()
        bal.logf("updated %d collections", updated)
-       return err
+       if err := <-errs; err != nil {
+               return fmt.Errorf("error updating collections: %s", err)
+       }
+       return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+       go func() {
+               err := f()
+               if err != nil {
+                       select {
+                       case errs <- err:
+                       default:
+                       }
+               }
+       }()
 }
 }