+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ defer bal.time("update_collections", "wall clock time to update collections")()
+ threshold := time.Now()
+ thresholdStr := threshold.Format(time.RFC3339Nano)
+
+ updated := int64(0)
+
+ errs := make(chan error, 1)
+ collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+ go func() {
+ defer close(collQ)
+ err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+ if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+ bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+ cancel()
+ return context.Canceled
+ }
+ collQ <- coll
+ return nil
+ }, func(done, total int) {
+ bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
+ })
+ if err != nil && err != context.Canceled {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
+
+ var wg sync.WaitGroup
+
+ // Use about 1 goroutine per 2 CPUs. Based on experiments with
+ // a 2-core host, using more concurrent database
+ // calls/transactions makes this process slower, not faster.
+ for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
+ wg.Add(1)
+ goSendErr(errs, func() error {
+ defer wg.Done()
+ tx, err := bal.DB.Beginx()
+ if err != nil {
+ return err
+ }
+ txPending := 0
+ flush := func(final bool) error {
+ err := tx.Commit()
+ if err != nil && ctx.Err() == nil {
+ tx.Rollback()
+ return err
+ }
+ txPending = 0
+ if final {
+ return nil
+ }
+ tx, err = bal.DB.Beginx()
+ return err
+ }
+ txBatch := 100
+ for coll := range collQ {
+ if ctx.Err() != nil || len(errs) > 0 {
+ continue
+ }
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.logf("%s: %s", coll.UUID, err)
+ continue
+ }
+ repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+ desired := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ desired = *coll.ReplicationDesired
+ }
+ if repl > desired {
+ // If actual>desired, confirm
+ // the desired number rather
+ // than actual to avoid
+ // flapping updates when
+ // replication increases
+ // temporarily.
+ repl = desired
+ }
+ classes := emptyJSONArray
+ if repl > 0 {
+ classes, err = json.Marshal(coll.StorageClassesDesired)
+ if err != nil {
+ bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+ continue
+ }
+ }
+ needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+ for i := range coll.StorageClassesDesired {
+ if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+ needUpdate = true
+ }
+ }
+ if !needUpdate {
+ continue
+ }
+ _, err = tx.ExecContext(ctx, `update collections set
+ replication_confirmed=$1,
+ replication_confirmed_at=$2,
+ storage_classes_confirmed=$3,
+ storage_classes_confirmed_at=$2
+ where uuid=$4`,
+ repl, thresholdStr, classes, coll.UUID)
+ if err != nil {
+ if ctx.Err() == nil {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ }
+ continue
+ }
+ atomic.AddInt64(&updated, 1)
+ if txPending++; txPending >= txBatch {
+ err = flush(false)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return flush(true)
+ })
+ }
+ wg.Wait()
+ bal.logf("updated %d collections", updated)
+ if len(errs) > 0 {
+ return fmt.Errorf("error updating collections: %s", <-errs)
+ }
+ return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+ go func() {
+ err := f()
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
+}
+
+var emptyJSONArray = []byte("[]")