X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bf4166939c77771642af846cb5372efc8a78659a..ba1937c21efd8d6392c0479579da1fc06443abdd:/services/keep-balance/collection.go diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go index 3afb1ccc55..d7a3fd981d 100644 --- a/services/keep-balance/collection.go +++ b/services/keep-balance/collection.go @@ -2,13 +2,12 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( "context" "encoding/json" "fmt" - "io" "runtime" "sync" "sync/atomic" @@ -34,10 +33,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int // The progress function is called periodically with done (number of // times f has been called) and total (number of times f is expected // to be called). -// -// If pageSize > 0 it is used as the maximum page size in each API -// call; otherwise the maximum allowed page size is requested. -func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error { +func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error { if progress == nil { progress = func(_, _ int) {} } @@ -49,124 +45,70 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func if err != nil { return err } + var newestModifiedAt time.Time - // Note the obvious way to get all collections (sorting by - // UUID) would be much easier, but would lose data: If a - // client were to move files from collection with uuid="zzz" - // to a collection with uuid="aaa" around the time when we - // were fetching the "mmm" page, we would never see those - // files' block IDs at all -- even if the client is careful to - // save "aaa" before saving "zzz". - // - // Instead, we get pages in modified_at order. Collections - // that are modified during the run will be re-fetched in a - // subsequent page. - - limit := pageSize - if limit <= 0 { - // Use the maximum page size the server allows - limit = 1<<31 - 1 - } - params := arvados.ResourceListParams{ - Limit: &limit, - Order: "modified_at, uuid", - Count: "none", - Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"}, - IncludeTrash: true, - IncludeOldVersions: true, + rows, err := db.QueryxContext(ctx, `SELECT + uuid, manifest_text, modified_at, portable_data_hash, + replication_desired, replication_confirmed, replication_confirmed_at, + storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at, + is_trashed + FROM collections`) + if err != nil { + return err } - var last arvados.Collection - var filterTime time.Time + defer rows.Close() + progressTicker := time.NewTicker(10 * time.Second) + defer progressTicker.Stop() callCount := 0 - gettingExactTimestamp := false - for { - progress(callCount, expectCount) - var page arvados.CollectionList - err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params) + for rows.Next() { + var coll arvados.Collection + var classesDesired, classesConfirmed []byte + err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, + &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt, + &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt, + &coll.IsTrashed) if err != nil { return err } - for _, coll := range page.Items { - if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID { - continue - } - callCount++ - err = f(coll) - if err != nil { - return err - } - last = coll + + err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired) + if err != nil && len(classesDesired) > 0 { + return err + } + err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed) + if err != nil && len(classesConfirmed) > 0 { + return err + } + if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) { + newestModifiedAt = coll.ModifiedAt } - if len(page.Items) == 0 && !gettingExactTimestamp { - break - } else if last.ModifiedAt.IsZero() { - return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID) - } else if len(page.Items) > 0 && last.ModifiedAt == filterTime { - // If we requested time>=X and never got a - // time>X then we might not have received all - // items with time==X yet. Switch to - // gettingExactTimestamp mode (if we're not - // there already), advancing our UUID - // threshold with each request, until we get - // an empty page. - gettingExactTimestamp = true - params.Filters = []arvados.Filter{{ - Attr: "modified_at", - Operator: "=", - Operand: filterTime, - }, { - Attr: "uuid", - Operator: ">", - Operand: last.UUID, - }} - } else if gettingExactTimestamp { - // This must be an empty page (in this mode, - // an unequal timestamp is impossible) so we - // can start getting pages of newer - // collections. - gettingExactTimestamp = false - params.Filters = []arvados.Filter{{ - Attr: "modified_at", - Operator: ">", - Operand: filterTime, - }} - } else { - // In the normal case, we know we have seen - // all collections with modtime= instead - // of > and skip the obvious overlapping item, - // i.e., the last item on the previous - // page. In some edge cases this can return - // collections we have already seen, but - // avoiding that would add overhead in the - // overwhelmingly common cases, so we don't - // bother. - filterTime = last.ModifiedAt - params.Filters = []arvados.Filter{{ - Attr: "modified_at", - Operator: ">=", - Operand: filterTime, - }, { - Attr: "uuid", - Operator: "!=", - Operand: last.UUID, - }} + callCount++ + err = f(coll) + if err != nil { + return err + } + select { + case <-progressTicker.C: + progress(callCount, expectCount) + default: } } progress(callCount, expectCount) - + err = rows.Close() + if err != nil { + return err + } if checkCount, err := countCollections(c, arvados.ResourceListParams{ Filters: []arvados.Filter{{ Attr: "modified_at", Operator: "<=", - Operand: filterTime}}, + Operand: newestModifiedAt}}, IncludeTrash: true, IncludeOldVersions: true, }); err != nil { return err } else if callCount < checkCount { - return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount) + return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount) } return nil @@ -180,54 +122,100 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c threshold := time.Now() thresholdStr := threshold.Format(time.RFC3339Nano) - var err error + updated := int64(0) + + errs := make(chan error, 1) collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers) go func() { defer close(collQ) - err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error { - if coll.ModifiedAt.After(threshold) { - return io.EOF - } - if coll.IsTrashed { - return nil + err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { + if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) { + bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit) + cancel() + return context.Canceled } collQ <- coll return nil }, func(done, total int) { - bal.logf("update collections: %d/%d", done, total) + bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds()) }) - if err == io.EOF { - err = nil - } else if err != nil { - bal.logf("error updating collections: %s", err) + if err != nil && err != context.Canceled { + select { + case errs <- err: + default: + } } }() - db, err := bal.db(cluster) - if err != nil { - return err - } - - var updated int64 var wg sync.WaitGroup - for i := 0; i < runtime.NumCPU(); i++ { + + // Use about 1 goroutine per 2 CPUs. Based on experiments with + // a 2-core host, using more concurrent database + // calls/transactions makes this process slower, not faster. + for i := 0; i < (runtime.NumCPU()+1)/2; i++ { wg.Add(1) - go func() { + goSendErr(errs, func() error { defer wg.Done() + tx, err := bal.DB.Beginx() + if err != nil { + return err + } + txPending := 0 + flush := func(final bool) error { + err := tx.Commit() + if err != nil && ctx.Err() == nil { + tx.Rollback() + return err + } + txPending = 0 + if final { + return nil + } + tx, err = bal.DB.Beginx() + return err + } + txBatch := 100 for coll := range collQ { + if ctx.Err() != nil || len(errs) > 0 { + continue + } blkids, err := coll.SizedDigests() if err != nil { bal.logf("%s: %s", coll.UUID, err) continue } repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired) - tx, err := db.Beginx() - if err != nil { - bal.logf("error opening transaction: %s", coll.UUID, err) - cancel() + + desired := bal.DefaultReplication + if coll.ReplicationDesired != nil { + desired = *coll.ReplicationDesired + } + if repl > desired { + // If actual>desired, confirm + // the desired number rather + // than actual to avoid + // flapping updates when + // replication increases + // temporarily. + repl = desired + } + classes := emptyJSONArray + if repl > 0 { + classes, err = json.Marshal(coll.StorageClassesDesired) + if err != nil { + bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err) + continue + } + } + needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired) + for i := range coll.StorageClassesDesired { + if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] { + needUpdate = true + } + } + if !needUpdate { continue } - classes, _ := json.Marshal(coll.StorageClassesDesired) _, err = tx.ExecContext(ctx, `update collections set replication_confirmed=$1, replication_confirmed_at=$2, @@ -236,33 +224,43 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c where uuid=$4`, repl, thresholdStr, classes, coll.UUID) if err != nil { - tx.Rollback() - } else { - err = tx.Commit() - } - if err != nil { - bal.logf("%s: update failed: %s", coll.UUID, err) + if ctx.Err() == nil { + bal.logf("%s: update failed: %s", coll.UUID, err) + } continue } atomic.AddInt64(&updated, 1) + if txPending++; txPending >= txBatch { + err = flush(false) + if err != nil { + return err + } + } } - }() + return flush(true) + }) } wg.Wait() bal.logf("updated %d collections", updated) - return err + if len(errs) > 0 { + return fmt.Errorf("error updating collections: %s", <-errs) + } + return nil } -func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) { - db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) - if err != nil { - return nil, err - } - if p := cluster.PostgreSQL.ConnectionPool; p > 0 { - db.SetMaxOpenConns(p) - } - if err := db.Ping(); err != nil { - return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err) - } - return db, nil +// Call f in a new goroutine. If it returns a non-nil error, send the +// error to the errs channel (unless the channel is already full with +// another error). +func goSendErr(errs chan<- error, f func() error) { + go func() { + err := f() + if err != nil { + select { + case errs <- err: + default: + } + } + }() } + +var emptyJSONArray = []byte("[]")