"context"
"encoding/json"
"fmt"
- "io"
"runtime"
"sync"
"sync/atomic"
if err != nil {
return err
}
+ progressTicker := time.NewTicker(10 * time.Second)
+ defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
if err != nil {
return err
}
- progress(callCount, expectCount)
+ select {
+ case <-progressTicker.C:
+ progress(callCount, expectCount)
+ default:
+ }
}
+ progress(callCount, expectCount)
rows.Close()
if err := rows.Err(); err != nil {
return err
threshold := time.Now()
thresholdStr := threshold.Format(time.RFC3339Nano)
+ updated := int64(0)
+
var err error
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
- err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
- if coll.ModifiedAt.After(threshold) {
- return io.EOF
- }
- if coll.IsTrashed {
- return nil
+ err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+ if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+ bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+ cancel()
+ return context.Canceled
}
collQ <- coll
return nil
}, func(done, total int) {
bal.logf("update collections: %d/%d", done, total)
})
- if err == io.EOF {
- err = nil
- } else if err != nil {
+ if err != nil && err != context.Canceled {
bal.logf("error updating collections: %s", err)
}
}()
- var updated int64
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for coll := range collQ {
+ if ctx.Err() != nil {
+ continue
+ }
blkids, err := coll.SizedDigests()
if err != nil {
bal.logf("%s: %s", coll.UUID, err)
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
if err != nil {
- bal.logf("%s: update failed: %s", coll.UUID, err)
+ if err != context.Canceled {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ }
continue
}
atomic.AddInt64(&updated, 1)