19249: Mime-encode metadata headers that have control chars.
[arvados.git] / services / keep-balance / collection.go
index 3afb1ccc5500ccba647940b8f31254a1cd117861..ccb01bdd10c5fb7f777d868a2aeefa175d494ace 100644 (file)
@@ -2,13 +2,12 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepbalance
 
 import (
        "context"
        "encoding/json"
        "fmt"
-       "io"
        "runtime"
        "sync"
        "sync/atomic"
@@ -34,10 +33,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 // The progress function is called periodically with done (number of
 // times f has been called) and total (number of times f is expected
 // to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
@@ -49,124 +45,70 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
        if err != nil {
                return err
        }
+       var newestModifiedAt time.Time
 
-       // Note the obvious way to get all collections (sorting by
-       // UUID) would be much easier, but would lose data: If a
-       // client were to move files from collection with uuid="zzz"
-       // to a collection with uuid="aaa" around the time when we
-       // were fetching the "mmm" page, we would never see those
-       // files' block IDs at all -- even if the client is careful to
-       // save "aaa" before saving "zzz".
-       //
-       // Instead, we get pages in modified_at order. Collections
-       // that are modified during the run will be re-fetched in a
-       // subsequent page.
-
-       limit := pageSize
-       if limit <= 0 {
-               // Use the maximum page size the server allows
-               limit = 1<<31 - 1
-       }
-       params := arvados.ResourceListParams{
-               Limit:              &limit,
-               Order:              "modified_at, uuid",
-               Count:              "none",
-               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
-               IncludeTrash:       true,
-               IncludeOldVersions: true,
+       rows, err := db.QueryxContext(ctx, `SELECT
+               uuid, manifest_text, modified_at, portable_data_hash,
+               replication_desired, replication_confirmed, replication_confirmed_at,
+               storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+               is_trashed
+               FROM collections`)
+       if err != nil {
+               return err
        }
-       var last arvados.Collection
-       var filterTime time.Time
+       defer rows.Close()
+       progressTicker := time.NewTicker(10 * time.Second)
+       defer progressTicker.Stop()
        callCount := 0
-       gettingExactTimestamp := false
-       for {
-               progress(callCount, expectCount)
-               var page arvados.CollectionList
-               err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+       for rows.Next() {
+               var coll arvados.Collection
+               var classesDesired, classesConfirmed []byte
+               err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+                       &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+                       &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+                       &coll.IsTrashed)
                if err != nil {
                        return err
                }
-               for _, coll := range page.Items {
-                       if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
-                               continue
-                       }
-                       callCount++
-                       err = f(coll)
-                       if err != nil {
-                               return err
-                       }
-                       last = coll
+
+               err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+               if err != nil && len(classesDesired) > 0 {
+                       return err
+               }
+               err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+               if err != nil && len(classesConfirmed) > 0 {
+                       return err
+               }
+               if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+                       newestModifiedAt = coll.ModifiedAt
                }
-               if len(page.Items) == 0 && !gettingExactTimestamp {
-                       break
-               } else if last.ModifiedAt.IsZero() {
-                       return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
-               } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
-                       // If we requested time>=X and never got a
-                       // time>X then we might not have received all
-                       // items with time==X yet. Switch to
-                       // gettingExactTimestamp mode (if we're not
-                       // there already), advancing our UUID
-                       // threshold with each request, until we get
-                       // an empty page.
-                       gettingExactTimestamp = true
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: "=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: ">",
-                               Operand:  last.UUID,
-                       }}
-               } else if gettingExactTimestamp {
-                       // This must be an empty page (in this mode,
-                       // an unequal timestamp is impossible) so we
-                       // can start getting pages of newer
-                       // collections.
-                       gettingExactTimestamp = false
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">",
-                               Operand:  filterTime,
-                       }}
-               } else {
-                       // In the normal case, we know we have seen
-                       // all collections with modtime<filterTime,
-                       // but we might not have seen all that have
-                       // modtime=filterTime. Hence we use >= instead
-                       // of > and skip the obvious overlapping item,
-                       // i.e., the last item on the previous
-                       // page. In some edge cases this can return
-                       // collections we have already seen, but
-                       // avoiding that would add overhead in the
-                       // overwhelmingly common cases, so we don't
-                       // bother.
-                       filterTime = last.ModifiedAt
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: "!=",
-                               Operand:  last.UUID,
-                       }}
+               callCount++
+               err = f(coll)
+               if err != nil {
+                       return err
+               }
+               select {
+               case <-progressTicker.C:
+                       progress(callCount, expectCount)
+               default:
                }
        }
        progress(callCount, expectCount)
-
+       err = rows.Close()
+       if err != nil {
+               return err
+       }
        if checkCount, err := countCollections(c, arvados.ResourceListParams{
                Filters: []arvados.Filter{{
                        Attr:     "modified_at",
                        Operator: "<=",
-                       Operand:  filterTime}},
+                       Operand:  newestModifiedAt}},
                IncludeTrash:       true,
                IncludeOldVersions: true,
        }); err != nil {
                return err
        } else if callCount < checkCount {
-               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
        }
 
        return nil
@@ -180,54 +122,100 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
        threshold := time.Now()
        thresholdStr := threshold.Format(time.RFC3339Nano)
 
-       var err error
+       updated := int64(0)
+
+       errs := make(chan error, 1)
        collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
        go func() {
                defer close(collQ)
-               err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
-                       if coll.ModifiedAt.After(threshold) {
-                               return io.EOF
-                       }
-                       if coll.IsTrashed {
-                               return nil
+               err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+                       if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+                               bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+                               cancel()
+                               return context.Canceled
                        }
                        collQ <- coll
                        return nil
                }, func(done, total int) {
-                       bal.logf("update collections: %d/%d", done, total)
+                       bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
                })
-               if err == io.EOF {
-                       err = nil
-               } else if err != nil {
-                       bal.logf("error updating collections: %s", err)
+               if err != nil && err != context.Canceled {
+                       select {
+                       case errs <- err:
+                       default:
+                       }
                }
        }()
 
-       db, err := bal.db(cluster)
-       if err != nil {
-               return err
-       }
-
-       var updated int64
        var wg sync.WaitGroup
-       for i := 0; i < runtime.NumCPU(); i++ {
+
+       // Use about 1 goroutine per 2 CPUs. Based on experiments with
+       // a 2-core host, using more concurrent database
+       // calls/transactions makes this process slower, not faster.
+       for i := 0; i < runtime.NumCPU()+1/2; i++ {
                wg.Add(1)
-               go func() {
+               goSendErr(errs, func() error {
                        defer wg.Done()
+                       tx, err := bal.DB.Beginx()
+                       if err != nil {
+                               return err
+                       }
+                       txPending := 0
+                       flush := func(final bool) error {
+                               err := tx.Commit()
+                               if err != nil && ctx.Err() == nil {
+                                       tx.Rollback()
+                                       return err
+                               }
+                               txPending = 0
+                               if final {
+                                       return nil
+                               }
+                               tx, err = bal.DB.Beginx()
+                               return err
+                       }
+                       txBatch := 100
                        for coll := range collQ {
+                               if ctx.Err() != nil || len(errs) > 0 {
+                                       continue
+                               }
                                blkids, err := coll.SizedDigests()
                                if err != nil {
                                        bal.logf("%s: %s", coll.UUID, err)
                                        continue
                                }
                                repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
-                               tx, err := db.Beginx()
-                               if err != nil {
-                                       bal.logf("error opening transaction: %s", coll.UUID, err)
-                                       cancel()
+
+                               desired := bal.DefaultReplication
+                               if coll.ReplicationDesired != nil {
+                                       desired = *coll.ReplicationDesired
+                               }
+                               if repl > desired {
+                                       // If actual>desired, confirm
+                                       // the desired number rather
+                                       // than actual to avoid
+                                       // flapping updates when
+                                       // replication increases
+                                       // temporarily.
+                                       repl = desired
+                               }
+                               classes := emptyJSONArray
+                               if repl > 0 {
+                                       classes, err = json.Marshal(coll.StorageClassesDesired)
+                                       if err != nil {
+                                               bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+                                               continue
+                                       }
+                               }
+                               needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+                               for i := range coll.StorageClassesDesired {
+                                       if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+                                               needUpdate = true
+                                       }
+                               }
+                               if !needUpdate {
                                        continue
                                }
-                               classes, _ := json.Marshal(coll.StorageClassesDesired)
                                _, err = tx.ExecContext(ctx, `update collections set
                                        replication_confirmed=$1,
                                        replication_confirmed_at=$2,
@@ -236,33 +224,43 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                                        where uuid=$4`,
                                        repl, thresholdStr, classes, coll.UUID)
                                if err != nil {
-                                       tx.Rollback()
-                               } else {
-                                       err = tx.Commit()
-                               }
-                               if err != nil {
-                                       bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       if ctx.Err() == nil {
+                                               bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       }
                                        continue
                                }
                                atomic.AddInt64(&updated, 1)
+                               if txPending++; txPending >= txBatch {
+                                       err = flush(false)
+                                       if err != nil {
+                                               return err
+                                       }
+                               }
                        }
-               }()
+                       return flush(true)
+               })
        }
        wg.Wait()
        bal.logf("updated %d collections", updated)
-       return err
+       if len(errs) > 0 {
+               return fmt.Errorf("error updating collections: %s", <-errs)
+       }
+       return nil
 }
 
-func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
-       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
-       if err != nil {
-               return nil, err
-       }
-       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
-               db.SetMaxOpenConns(p)
-       }
-       if err := db.Ping(); err != nil {
-               return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
-       }
-       return db, nil
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+       go func() {
+               err := f()
+               if err != nil {
+                       select {
+                       case errs <- err:
+                       default:
+                       }
+               }
+       }()
 }
+
+var emptyJSONArray = []byte("[]")