//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepbalance
import (
"context"
"encoding/json"
"fmt"
- "io"
"runtime"
"sync"
"sync/atomic"
// The progress function is called periodically with done (number of
// times f has been called) and total (number of times f is expected
// to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
if err != nil {
return err
}
+ var newestModifiedAt time.Time
- // Note the obvious way to get all collections (sorting by
- // UUID) would be much easier, but would lose data: If a
- // client were to move files from collection with uuid="zzz"
- // to a collection with uuid="aaa" around the time when we
- // were fetching the "mmm" page, we would never see those
- // files' block IDs at all -- even if the client is careful to
- // save "aaa" before saving "zzz".
- //
- // Instead, we get pages in modified_at order. Collections
- // that are modified during the run will be re-fetched in a
- // subsequent page.
-
- limit := pageSize
- if limit <= 0 {
- // Use the maximum page size the server allows
- limit = 1<<31 - 1
- }
- params := arvados.ResourceListParams{
- Limit: &limit,
- Order: "modified_at, uuid",
- Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
- IncludeTrash: true,
- IncludeOldVersions: true,
+ rows, err := db.QueryxContext(ctx, `SELECT
+ uuid, manifest_text, modified_at, portable_data_hash,
+ replication_desired, replication_confirmed, replication_confirmed_at,
+ storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+ is_trashed
+ FROM collections`)
+ if err != nil {
+ return err
}
- var last arvados.Collection
- var filterTime time.Time
+ defer rows.Close()
+ progressTicker := time.NewTicker(10 * time.Second)
+ defer progressTicker.Stop()
callCount := 0
- gettingExactTimestamp := false
- for {
- progress(callCount, expectCount)
- var page arvados.CollectionList
- err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+ for rows.Next() {
+ var coll arvados.Collection
+ var classesDesired, classesConfirmed []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+ &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+ &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+ &coll.IsTrashed)
if err != nil {
return err
}
- for _, coll := range page.Items {
- if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
- continue
- }
- callCount++
- err = f(coll)
- if err != nil {
- return err
- }
- last = coll
+
+ err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+ if err != nil && len(classesDesired) > 0 {
+ return err
+ }
+ err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+ if err != nil && len(classesConfirmed) > 0 {
+ return err
+ }
+ if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+ newestModifiedAt = coll.ModifiedAt
}
- if len(page.Items) == 0 && !gettingExactTimestamp {
- break
- } else if last.ModifiedAt.IsZero() {
- return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
- } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
- // If we requested time>=X and never got a
- // time>X then we might not have received all
- // items with time==X yet. Switch to
- // gettingExactTimestamp mode (if we're not
- // there already), advancing our UUID
- // threshold with each request, until we get
- // an empty page.
- gettingExactTimestamp = true
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: "=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: ">",
- Operand: last.UUID,
- }}
- } else if gettingExactTimestamp {
- // This must be an empty page (in this mode,
- // an unequal timestamp is impossible) so we
- // can start getting pages of newer
- // collections.
- gettingExactTimestamp = false
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">",
- Operand: filterTime,
- }}
- } else {
- // In the normal case, we know we have seen
- // all collections with modtime<filterTime,
- // but we might not have seen all that have
- // modtime=filterTime. Hence we use >= instead
- // of > and skip the obvious overlapping item,
- // i.e., the last item on the previous
- // page. In some edge cases this can return
- // collections we have already seen, but
- // avoiding that would add overhead in the
- // overwhelmingly common cases, so we don't
- // bother.
- filterTime = last.ModifiedAt
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: "!=",
- Operand: last.UUID,
- }}
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ select {
+ case <-progressTicker.C:
+ progress(callCount, expectCount)
+ default:
}
}
progress(callCount, expectCount)
-
+ err = rows.Close()
+ if err != nil {
+ return err
+ }
if checkCount, err := countCollections(c, arvados.ResourceListParams{
Filters: []arvados.Filter{{
Attr: "modified_at",
Operator: "<=",
- Operand: filterTime}},
+ Operand: newestModifiedAt}},
IncludeTrash: true,
IncludeOldVersions: true,
}); err != nil {
return err
} else if callCount < checkCount {
- return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
}
return nil
threshold := time.Now()
thresholdStr := threshold.Format(time.RFC3339Nano)
- var err error
+ updated := int64(0)
+
+ errs := make(chan error, 1)
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
- err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
- if coll.ModifiedAt.After(threshold) {
- return io.EOF
- }
- if coll.IsTrashed {
- return nil
+ err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+ if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+ bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+ cancel()
+ return context.Canceled
}
collQ <- coll
return nil
}, func(done, total int) {
- bal.logf("update collections: %d/%d", done, total)
+ bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
})
- if err == io.EOF {
- err = nil
- } else if err != nil {
- bal.logf("error updating collections: %s", err)
+ if err != nil && err != context.Canceled {
+ select {
+ case errs <- err:
+ default:
+ }
}
}()
- db, err := bal.db(cluster)
- if err != nil {
- return err
- }
-
- var updated int64
var wg sync.WaitGroup
- for i := 0; i < runtime.NumCPU(); i++ {
+
+ // Use about 1 goroutine per 2 CPUs. Based on experiments with
+ // a 2-core host, using more concurrent database
+ // calls/transactions makes this process slower, not faster.
+ for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
wg.Add(1)
- go func() {
+ goSendErr(errs, func() error {
defer wg.Done()
+ tx, err := bal.DB.Beginx()
+ if err != nil {
+ return err
+ }
+ txPending := 0
+ flush := func(final bool) error {
+ err := tx.Commit()
+ if err != nil && ctx.Err() == nil {
+ tx.Rollback()
+ return err
+ }
+ txPending = 0
+ if final {
+ return nil
+ }
+ tx, err = bal.DB.Beginx()
+ return err
+ }
+ txBatch := 100
for coll := range collQ {
+ if ctx.Err() != nil || len(errs) > 0 {
+ continue
+ }
blkids, err := coll.SizedDigests()
if err != nil {
bal.logf("%s: %s", coll.UUID, err)
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
- tx, err := db.Beginx()
- if err != nil {
- bal.logf("error opening transaction: %s", coll.UUID, err)
- cancel()
+
+ desired := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ desired = *coll.ReplicationDesired
+ }
+ if repl > desired {
+ // If actual>desired, confirm
+ // the desired number rather
+ // than actual to avoid
+ // flapping updates when
+ // replication increases
+ // temporarily.
+ repl = desired
+ }
+ classes := emptyJSONArray
+ if repl > 0 {
+ classes, err = json.Marshal(coll.StorageClassesDesired)
+ if err != nil {
+ bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+ continue
+ }
+ }
+ needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+ for i := range coll.StorageClassesDesired {
+ if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+ needUpdate = true
+ }
+ }
+ if !needUpdate {
continue
}
- classes, _ := json.Marshal(coll.StorageClassesDesired)
_, err = tx.ExecContext(ctx, `update collections set
replication_confirmed=$1,
replication_confirmed_at=$2,
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
if err != nil {
- tx.Rollback()
- } else {
- err = tx.Commit()
- }
- if err != nil {
- bal.logf("%s: update failed: %s", coll.UUID, err)
+ if ctx.Err() == nil {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ }
continue
}
atomic.AddInt64(&updated, 1)
+ if txPending++; txPending >= txBatch {
+ err = flush(false)
+ if err != nil {
+ return err
+ }
+ }
}
- }()
+ return flush(true)
+ })
}
wg.Wait()
bal.logf("updated %d collections", updated)
- return err
+ if len(errs) > 0 {
+ return fmt.Errorf("error updating collections: %s", <-errs)
+ }
+ return nil
}
-func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
- db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
- if err != nil {
- return nil, err
- }
- if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
- db.SetMaxOpenConns(p)
- }
- if err := db.Ping(); err != nil {
- return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
- }
- return db, nil
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+ go func() {
+ err := f()
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
}
+
+var emptyJSONArray = []byte("[]")