summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
46a33e2)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>
}
var newestModifiedAt time.Time
}
var newestModifiedAt time.Time
- rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+ rows, err := db.QueryxContext(ctx, `SELECT
+ uuid, manifest_text, modified_at, portable_data_hash,
+ replication_desired, replication_confirmed, replication_confirmed_at,
+ storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+ is_trashed
+ FROM collections`)
if err != nil {
return err
}
if err != nil {
return err
}
progressTicker := time.NewTicker(10 * time.Second)
defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
progressTicker := time.NewTicker(10 * time.Second)
defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
- var classesDesired []byte
- err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
+ var classesDesired, classesConfirmed []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+ &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+ &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+ &coll.IsTrashed)
err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
- if err != nil {
- rows.Close()
+ if err != nil && len(classesDesired) > 0 {
+ return err
+ }
+ err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+ if err != nil && len(classesConfirmed) > 0 {
return err
}
if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
return err
}
if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
}
}
progress(callCount, expectCount)
}
}
progress(callCount, expectCount)
- rows.Close()
- if err := rows.Err(); err != nil {
+ err = rows.Close()
+ if err != nil {
return err
}
if checkCount, err := countCollections(c, arvados.ResourceListParams{
return err
}
if checkCount, err := countCollections(c, arvados.ResourceListParams{
+ errs := make(chan error, 1)
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
collQ <- coll
return nil
}, func(done, total int) {
collQ <- coll
return nil
}, func(done, total int) {
- bal.logf("update collections: %d/%d", done, total)
+ bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
})
if err != nil && err != context.Canceled {
})
if err != nil && err != context.Canceled {
- bal.logf("error updating collections: %s", err)
+ select {
+ case errs <- err:
+ default:
+ }
}
}()
var wg sync.WaitGroup
}
}()
var wg sync.WaitGroup
- for i := 0; i < runtime.NumCPU(); i++ {
+
+ // Use about 1 goroutine per 2 CPUs. Based on experiments with
+ // a 2-core host, using more concurrent database
+ // calls/transactions makes this process slower, not faster.
+ for i := 0; i < runtime.NumCPU()+1/2; i++ {
+ goSendErr(errs, func() error {
+ tx, err := bal.DB.Beginx()
+ if err != nil {
+ return err
+ }
+ txPending := 0
+ flush := func(final bool) error {
+ err := tx.Commit()
+ if err != nil {
+ tx.Rollback()
+ return err
+ }
+ txPending = 0
+ if final {
+ return nil
+ }
+ tx, err = bal.DB.Beginx()
+ return err
+ }
+ txBatch := 100
for coll := range collQ {
for coll := range collQ {
+ if ctx.Err() != nil || len(errs) > 0 {
continue
}
blkids, err := coll.SizedDigests()
continue
}
blkids, err := coll.SizedDigests()
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+ desired := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ desired = *coll.ReplicationDesired
+ }
+ if repl > desired {
+ // If actual>desired, confirm
+ // the desired number rather
+ // than actual to avoid
+ // flapping updates when
+ // replication increases
+ // temporarily.
+ repl = desired
+ }
classes, err := json.Marshal(coll.StorageClassesDesired)
if err != nil {
bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
continue
}
classes, err := json.Marshal(coll.StorageClassesDesired)
if err != nil {
bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
continue
}
- _, err = bal.DB.ExecContext(ctx, `update collections set
+ needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+ for i := range coll.StorageClassesDesired {
+ if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+ needUpdate = true
+ }
+ }
+ if !needUpdate {
+ continue
+ }
+ _, err = tx.ExecContext(ctx, `update collections set
replication_confirmed=$1,
replication_confirmed_at=$2,
storage_classes_confirmed=$3,
replication_confirmed=$1,
replication_confirmed_at=$2,
storage_classes_confirmed=$3,
continue
}
atomic.AddInt64(&updated, 1)
continue
}
atomic.AddInt64(&updated, 1)
+ if txPending++; txPending >= txBatch {
+ err = flush(false)
+ if err != nil {
+ return err
+ }
+ }
+ return flush(true)
+ })
}
wg.Wait()
bal.logf("updated %d collections", updated)
}
wg.Wait()
bal.logf("updated %d collections", updated)
+ if err := <-errs; err != nil {
+ return fmt.Errorf("error updating collections: %s", err)
+ }
+ return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+ go func() {
+ err := f()
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()