From: Tom Clegg Date: Wed, 28 Jul 2021 20:11:52 +0000 (-0400) Subject: 17574: Batch updates into transactions, skip when unchanged. X-Git-Tag: 2.3.0~118^2~7 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/1657c7fbb89e790655f830630f60c68ceaf8569f 17574: Batch updates into transactions, skip when unchanged. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go index 6e0a066e86..daedeb8bfc 100644 --- a/services/keep-balance/collection.go +++ b/services/keep-balance/collection.go @@ -47,24 +47,36 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func( } var newestModifiedAt time.Time - rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`) + rows, err := db.QueryxContext(ctx, `SELECT + uuid, manifest_text, modified_at, portable_data_hash, + replication_desired, replication_confirmed, replication_confirmed_at, + storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at, + is_trashed + FROM collections`) if err != nil { return err } + defer rows.Close() progressTicker := time.NewTicker(10 * time.Second) defer progressTicker.Stop() callCount := 0 for rows.Next() { var coll arvados.Collection - var classesDesired []byte - err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed) + var classesDesired, classesConfirmed []byte + err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, + &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt, + &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt, + &coll.IsTrashed) if err != nil { - rows.Close() return err } + err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired) - if err != nil { - rows.Close() + if err != nil && len(classesDesired) > 0 { + return err + } + err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed) + if err != nil && len(classesConfirmed) > 0 { return err } if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) { @@ -82,8 +94,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func( } } progress(callCount, expectCount) - rows.Close() - if err := rows.Err(); err != nil { + err = rows.Close() + if err != nil { return err } if checkCount, err := countCollections(c, arvados.ResourceListParams{ @@ -112,7 +124,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c updated := int64(0) - var err error + errs := make(chan error, 1) collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers) go func() { defer close(collQ) @@ -125,20 +137,46 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c collQ <- coll return nil }, func(done, total int) { - bal.logf("update collections: %d/%d", done, total) + bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds()) }) if err != nil && err != context.Canceled { - bal.logf("error updating collections: %s", err) + select { + case errs <- err: + default: + } } }() var wg sync.WaitGroup - for i := 0; i < runtime.NumCPU(); i++ { + + // Use about 1 goroutine per 2 CPUs. Based on experiments with + // a 2-core host, using more concurrent database + // calls/transactions makes this process slower, not faster. + for i := 0; i < runtime.NumCPU()+1/2; i++ { wg.Add(1) - go func() { + goSendErr(errs, func() error { defer wg.Done() + tx, err := bal.DB.Beginx() + if err != nil { + return err + } + txPending := 0 + flush := func(final bool) error { + err := tx.Commit() + if err != nil { + tx.Rollback() + return err + } + txPending = 0 + if final { + return nil + } + tx, err = bal.DB.Beginx() + return err + } + txBatch := 100 for coll := range collQ { - if ctx.Err() != nil { + if ctx.Err() != nil || len(errs) > 0 { continue } blkids, err := coll.SizedDigests() @@ -147,12 +185,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c continue } repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired) + + desired := bal.DefaultReplication + if coll.ReplicationDesired != nil { + desired = *coll.ReplicationDesired + } + if repl > desired { + // If actual>desired, confirm + // the desired number rather + // than actual to avoid + // flapping updates when + // replication increases + // temporarily. + repl = desired + } classes, err := json.Marshal(coll.StorageClassesDesired) if err != nil { bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err) continue } - _, err = bal.DB.ExecContext(ctx, `update collections set + needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired) + for i := range coll.StorageClassesDesired { + if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] { + needUpdate = true + } + } + if !needUpdate { + continue + } + _, err = tx.ExecContext(ctx, `update collections set replication_confirmed=$1, replication_confirmed_at=$2, storage_classes_confirmed=$3, @@ -166,10 +227,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c continue } atomic.AddInt64(&updated, 1) + if txPending++; txPending >= txBatch { + err = flush(false) + if err != nil { + return err + } + } } - }() + return flush(true) + }) } wg.Wait() bal.logf("updated %d collections", updated) - return err + if err := <-errs; err != nil { + return fmt.Errorf("error updating collections: %s", err) + } + return nil +} + +// Call f in a new goroutine. If it returns a non-nil error, send the +// error to the errs channel (unless the channel is already full with +// another error). +func goSendErr(errs chan<- error, f func() error) { + go func() { + err := f() + if err != nil { + select { + case errs <- err: + default: + } + } + }() }