//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepbalance
import (
"context"
}
var newestModifiedAt time.Time
- rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+ rows, err := db.QueryxContext(ctx, `SELECT
+ uuid, manifest_text, modified_at, portable_data_hash,
+ replication_desired, replication_confirmed, replication_confirmed_at,
+ storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+ is_trashed
+ FROM collections`)
if err != nil {
return err
}
+ defer rows.Close()
progressTicker := time.NewTicker(10 * time.Second)
defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
- var classesDesired []byte
- err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
+ var classesDesired, classesConfirmed []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+ &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+ &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+ &coll.IsTrashed)
if err != nil {
- rows.Close()
return err
}
+
err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
- if err != nil {
- rows.Close()
+ if err != nil && len(classesDesired) > 0 {
+ return err
+ }
+ err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+ if err != nil && len(classesConfirmed) > 0 {
return err
}
if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
}
}
progress(callCount, expectCount)
- rows.Close()
- if err := rows.Err(); err != nil {
+ err = rows.Close()
+ if err != nil {
return err
}
if checkCount, err := countCollections(c, arvados.ResourceListParams{
updated := int64(0)
- var err error
+ errs := make(chan error, 1)
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
collQ <- coll
return nil
}, func(done, total int) {
- bal.logf("update collections: %d/%d", done, total)
+ bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
})
if err != nil && err != context.Canceled {
- bal.logf("error updating collections: %s", err)
+ select {
+ case errs <- err:
+ default:
+ }
}
}()
var wg sync.WaitGroup
- for i := 0; i < runtime.NumCPU(); i++ {
+
+ // Use about 1 goroutine per 2 CPUs. Based on experiments with
+ // a 2-core host, using more concurrent database
+ // calls/transactions makes this process slower, not faster.
+ for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
wg.Add(1)
- go func() {
+ goSendErr(errs, func() error {
defer wg.Done()
+ tx, err := bal.DB.Beginx()
+ if err != nil {
+ return err
+ }
+ txPending := 0
+ flush := func(final bool) error {
+ err := tx.Commit()
+ if err != nil && ctx.Err() == nil {
+ tx.Rollback()
+ return err
+ }
+ txPending = 0
+ if final {
+ return nil
+ }
+ tx, err = bal.DB.Beginx()
+ return err
+ }
+ txBatch := 100
for coll := range collQ {
- if ctx.Err() != nil {
+ if ctx.Err() != nil || len(errs) > 0 {
continue
}
blkids, err := coll.SizedDigests()
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
- classes, err := json.Marshal(coll.StorageClassesDesired)
- if err != nil {
- bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+
+ desired := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ desired = *coll.ReplicationDesired
+ }
+ if repl > desired {
+ // If actual>desired, confirm
+ // the desired number rather
+ // than actual to avoid
+ // flapping updates when
+ // replication increases
+ // temporarily.
+ repl = desired
+ }
+ classes := emptyJSONArray
+ if repl > 0 {
+ classes, err = json.Marshal(coll.StorageClassesDesired)
+ if err != nil {
+ bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+ continue
+ }
+ }
+ needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+ for i := range coll.StorageClassesDesired {
+ if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+ needUpdate = true
+ }
+ }
+ if !needUpdate {
continue
}
- _, err = bal.DB.ExecContext(ctx, `update collections set
+ _, err = tx.ExecContext(ctx, `update collections set
replication_confirmed=$1,
replication_confirmed_at=$2,
storage_classes_confirmed=$3,
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
if err != nil {
- if err != context.Canceled {
+ if ctx.Err() == nil {
bal.logf("%s: update failed: %s", coll.UUID, err)
}
continue
}
atomic.AddInt64(&updated, 1)
+ if txPending++; txPending >= txBatch {
+ err = flush(false)
+ if err != nil {
+ return err
+ }
+ }
}
- }()
+ return flush(true)
+ })
}
wg.Wait()
bal.logf("updated %d collections", updated)
- return err
+ if len(errs) > 0 {
+ return fmt.Errorf("error updating collections: %s", <-errs)
+ }
+ return nil
}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+ go func() {
+ err := f()
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
+}
+
+var emptyJSONArray = []byte("[]")