17574: Get collections directly from DB instead of controller.
[arvados.git] / services / keep-balance / collection.go
index a7a484672a0c3c073717ac1e65e77c44af21cdf5..ba9edf9391cddca34645e8e1f5981b896564a62a 100644 (file)
@@ -1,10 +1,21 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
+       "context"
+       "encoding/json"
        "fmt"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
 )
 
 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
@@ -23,125 +34,133 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 // The progress function is called periodically with done (number of
 // times f has been called) and total (number of times f is expected
 // to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
 
        expectCount, err := countCollections(c, arvados.ResourceListParams{
-               IncludeTrash: true,
+               IncludeTrash:       true,
+               IncludeOldVersions: true,
        })
        if err != nil {
                return err
        }
+       var newestModifiedAt time.Time
 
-       limit := pageSize
-       if limit <= 0 {
-               // Use the maximum page size the server allows
-               limit = 1<<31 - 1
-       }
-       params := arvados.ResourceListParams{
-               Limit:        &limit,
-               Order:        "modified_at, uuid",
-               Count:        "none",
-               Select:       []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
-               IncludeTrash: true,
+       rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+       if err != nil {
+               return err
        }
-       var last arvados.Collection
-       var filterTime time.Time
        callCount := 0
-       gettingExactTimestamp := false
-       for {
-               progress(callCount, expectCount)
-               var page arvados.CollectionList
-               err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+       for rows.Next() {
+               var coll arvados.Collection
+               var classesDesired []byte
+               err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
                if err != nil {
+                       rows.Close()
                        return err
                }
-               for _, coll := range page.Items {
-                       if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
-                               continue
-                       }
-                       callCount++
-                       err = f(coll)
-                       if err != nil {
-                               return err
-                       }
-                       last = coll
+               err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+               if err != nil {
+                       rows.Close()
+                       return err
                }
-               if len(page.Items) == 0 && !gettingExactTimestamp {
-                       break
-               } else if last.ModifiedAt == nil {
-                       return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
-               } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
-                       // If we requested time>=X and never got a
-                       // time>X then we might not have received all
-                       // items with time==X yet. Switch to
-                       // gettingExactTimestamp mode (if we're not
-                       // there already), advancing our UUID
-                       // threshold with each request, until we get
-                       // an empty page.
-                       gettingExactTimestamp = true
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: "=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: ">",
-                               Operand:  last.UUID,
-                       }}
-               } else if gettingExactTimestamp {
-                       // This must be an empty page (in this mode,
-                       // an unequal timestamp is impossible) so we
-                       // can start getting pages of newer
-                       // collections.
-                       gettingExactTimestamp = false
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">",
-                               Operand:  filterTime,
-                       }}
-               } else {
-                       // In the normal case, we know we have seen
-                       // all collections with modtime<filterTime,
-                       // but we might not have seen all that have
-                       // modtime=filterTime. Hence we use >= instead
-                       // of > and skip the obvious overlapping item,
-                       // i.e., the last item on the previous
-                       // page. In some edge cases this can return
-                       // collections we have already seen, but
-                       // avoiding that would add overhead in the
-                       // overwhelmingly common cases, so we don't
-                       // bother.
-                       filterTime = *last.ModifiedAt
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: "!=",
-                               Operand:  last.UUID,
-                       }}
+               if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+                       newestModifiedAt = coll.ModifiedAt
+               }
+               callCount++
+               err = f(coll)
+               if err != nil {
+                       return err
                }
+               progress(callCount, expectCount)
+       }
+       rows.Close()
+       if err := rows.Err(); err != nil {
+               return err
        }
-       progress(callCount, expectCount)
-
        if checkCount, err := countCollections(c, arvados.ResourceListParams{
                Filters: []arvados.Filter{{
                        Attr:     "modified_at",
                        Operator: "<=",
-                       Operand:  filterTime}},
-               IncludeTrash: true,
+                       Operand:  newestModifiedAt}},
+               IncludeTrash:       true,
+               IncludeOldVersions: true,
        }); err != nil {
                return err
        } else if callCount < checkCount {
-               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
        }
 
        return nil
 }
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
+       defer bal.time("update_collections", "wall clock time to update collections")()
+       threshold := time.Now()
+       thresholdStr := threshold.Format(time.RFC3339Nano)
+
+       var err error
+       collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+       go func() {
+               defer close(collQ)
+               err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+                       if coll.ModifiedAt.After(threshold) {
+                               return io.EOF
+                       }
+                       if coll.IsTrashed {
+                               return nil
+                       }
+                       collQ <- coll
+                       return nil
+               }, func(done, total int) {
+                       bal.logf("update collections: %d/%d", done, total)
+               })
+               if err == io.EOF {
+                       err = nil
+               } else if err != nil {
+                       bal.logf("error updating collections: %s", err)
+               }
+       }()
+
+       var updated int64
+       var wg sync.WaitGroup
+       for i := 0; i < runtime.NumCPU(); i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for coll := range collQ {
+                               blkids, err := coll.SizedDigests()
+                               if err != nil {
+                                       bal.logf("%s: %s", coll.UUID, err)
+                                       continue
+                               }
+                               repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+                               classes, err := json.Marshal(coll.StorageClassesDesired)
+                               if err != nil {
+                                       bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+                                       continue
+                               }
+                               _, err = bal.DB.ExecContext(ctx, `update collections set
+                                       replication_confirmed=$1,
+                                       replication_confirmed_at=$2,
+                                       storage_classes_confirmed=$3,
+                                       storage_classes_confirmed_at=$2
+                                       where uuid=$4`,
+                                       repl, thresholdStr, classes, coll.UUID)
+                               if err != nil {
+                                       bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       continue
+                               }
+                               atomic.AddInt64(&updated, 1)
+                       }
+               }()
+       }
+       wg.Wait()
+       bal.logf("updated %d collections", updated)
+       return err
+}