17574: Get collections directly from DB instead of controller.
authorTom Clegg <tom@curii.com>
Tue, 27 Jul 2021 15:16:49 +0000 (11:16 -0400)
committerTom Clegg <tom@curii.com>
Tue, 27 Jul 2021 15:16:49 +0000 (11:16 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-balance/balance.go
services/keep-balance/collection.go
services/keep-balance/collection_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/server.go

index a7dcf61902d830d7b389d8bc36eee7f90a178c21..6a71cf99f71e6431e9803e9d82e270da511949a7 100644 (file)
@@ -23,6 +23,7 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
 )
 
@@ -36,6 +37,7 @@ import (
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
+       DB      *sqlx.DB
        Logger  logrus.FieldLogger
        Dumper  logrus.FieldLogger
        Metrics *metrics
@@ -424,7 +426,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(ctx, c, pageSize,
+               err = EachCollection(ctx, bal.DB, c,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
index 3afb1ccc5500ccba647940b8f31254a1cd117861..ba9edf9391cddca34645e8e1f5981b896564a62a 100644 (file)
@@ -34,10 +34,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 // The progress function is called periodically with done (number of
 // times f has been called) and total (number of times f is expected
 // to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
@@ -49,124 +46,51 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
        if err != nil {
                return err
        }
+       var newestModifiedAt time.Time
 
-       // Note the obvious way to get all collections (sorting by
-       // UUID) would be much easier, but would lose data: If a
-       // client were to move files from collection with uuid="zzz"
-       // to a collection with uuid="aaa" around the time when we
-       // were fetching the "mmm" page, we would never see those
-       // files' block IDs at all -- even if the client is careful to
-       // save "aaa" before saving "zzz".
-       //
-       // Instead, we get pages in modified_at order. Collections
-       // that are modified during the run will be re-fetched in a
-       // subsequent page.
-
-       limit := pageSize
-       if limit <= 0 {
-               // Use the maximum page size the server allows
-               limit = 1<<31 - 1
-       }
-       params := arvados.ResourceListParams{
-               Limit:              &limit,
-               Order:              "modified_at, uuid",
-               Count:              "none",
-               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
-               IncludeTrash:       true,
-               IncludeOldVersions: true,
+       rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+       if err != nil {
+               return err
        }
-       var last arvados.Collection
-       var filterTime time.Time
        callCount := 0
-       gettingExactTimestamp := false
-       for {
-               progress(callCount, expectCount)
-               var page arvados.CollectionList
-               err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+       for rows.Next() {
+               var coll arvados.Collection
+               var classesDesired []byte
+               err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
                if err != nil {
+                       rows.Close()
                        return err
                }
-               for _, coll := range page.Items {
-                       if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
-                               continue
-                       }
-                       callCount++
-                       err = f(coll)
-                       if err != nil {
-                               return err
-                       }
-                       last = coll
+               err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+               if err != nil {
+                       rows.Close()
+                       return err
                }
-               if len(page.Items) == 0 && !gettingExactTimestamp {
-                       break
-               } else if last.ModifiedAt.IsZero() {
-                       return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
-               } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
-                       // If we requested time>=X and never got a
-                       // time>X then we might not have received all
-                       // items with time==X yet. Switch to
-                       // gettingExactTimestamp mode (if we're not
-                       // there already), advancing our UUID
-                       // threshold with each request, until we get
-                       // an empty page.
-                       gettingExactTimestamp = true
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: "=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: ">",
-                               Operand:  last.UUID,
-                       }}
-               } else if gettingExactTimestamp {
-                       // This must be an empty page (in this mode,
-                       // an unequal timestamp is impossible) so we
-                       // can start getting pages of newer
-                       // collections.
-                       gettingExactTimestamp = false
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">",
-                               Operand:  filterTime,
-                       }}
-               } else {
-                       // In the normal case, we know we have seen
-                       // all collections with modtime<filterTime,
-                       // but we might not have seen all that have
-                       // modtime=filterTime. Hence we use >= instead
-                       // of > and skip the obvious overlapping item,
-                       // i.e., the last item on the previous
-                       // page. In some edge cases this can return
-                       // collections we have already seen, but
-                       // avoiding that would add overhead in the
-                       // overwhelmingly common cases, so we don't
-                       // bother.
-                       filterTime = last.ModifiedAt
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: "!=",
-                               Operand:  last.UUID,
-                       }}
+               if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+                       newestModifiedAt = coll.ModifiedAt
                }
+               callCount++
+               err = f(coll)
+               if err != nil {
+                       return err
+               }
+               progress(callCount, expectCount)
+       }
+       rows.Close()
+       if err := rows.Err(); err != nil {
+               return err
        }
-       progress(callCount, expectCount)
-
        if checkCount, err := countCollections(c, arvados.ResourceListParams{
                Filters: []arvados.Filter{{
                        Attr:     "modified_at",
                        Operator: "<=",
-                       Operand:  filterTime}},
+                       Operand:  newestModifiedAt}},
                IncludeTrash:       true,
                IncludeOldVersions: true,
        }); err != nil {
                return err
        } else if callCount < checkCount {
-               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
        }
 
        return nil
@@ -184,7 +108,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
        collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
        go func() {
                defer close(collQ)
-               err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+               err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
                        if coll.ModifiedAt.After(threshold) {
                                return io.EOF
                        }
@@ -203,11 +127,6 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                }
        }()
 
-       db, err := bal.db(cluster)
-       if err != nil {
-               return err
-       }
-
        var updated int64
        var wg sync.WaitGroup
        for i := 0; i < runtime.NumCPU(); i++ {
@@ -221,25 +140,18 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
                                        continue
                                }
                                repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
-                               tx, err := db.Beginx()
+                               classes, err := json.Marshal(coll.StorageClassesDesired)
                                if err != nil {
-                                       bal.logf("error opening transaction: %s", coll.UUID, err)
-                                       cancel()
+                                       bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
                                        continue
                                }
-                               classes, _ := json.Marshal(coll.StorageClassesDesired)
-                               _, err = tx.ExecContext(ctx, `update collections set
+                               _, err = bal.DB.ExecContext(ctx, `update collections set
                                        replication_confirmed=$1,
                                        replication_confirmed_at=$2,
                                        storage_classes_confirmed=$3,
                                        storage_classes_confirmed_at=$2
                                        where uuid=$4`,
                                        repl, thresholdStr, classes, coll.UUID)
-                               if err != nil {
-                                       tx.Rollback()
-                               } else {
-                                       err = tx.Commit()
-                               }
                                if err != nil {
                                        bal.logf("%s: update failed: %s", coll.UUID, err)
                                        continue
@@ -252,17 +164,3 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
        bal.logf("updated %d collections", updated)
        return err
 }
-
-func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
-       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
-       if err != nil {
-               return nil, err
-       }
-       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
-               db.SetMaxOpenConns(p)
-       }
-       if err := db.Ping(); err != nil {
-               return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
-       }
-       return db, nil
-}
index 3ab9d07b2e2ed6bcc7220ae17aad4e6e7a665855..f749bad6ad1865a30670d0fe2978dfe8ebd2764c 100644 (file)
@@ -6,57 +6,34 @@ package main
 
 import (
        "context"
-       "sync"
-       "time"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
        check "gopkg.in/check.v1"
 )
 
-//  TestIdenticalTimestamps ensures EachCollection returns the same
-//  set of collections for various page sizes -- even page sizes so
-//  small that we get entire pages full of collections with identical
-//  timestamps and exercise our gettingExactTimestamp cases.
-func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
-       // pageSize==0 uses the default (large) page size.
-       pageSizes := []int{0, 2, 3, 4, 5}
-       got := make([][]string, len(pageSizes))
-       var wg sync.WaitGroup
-       for trial, pageSize := range pageSizes {
-               wg.Add(1)
-               go func(trial, pageSize int) {
-                       defer wg.Done()
-                       streak := 0
-                       longestStreak := 0
-                       var lastMod time.Time
-                       sawUUID := make(map[string]bool)
-                       err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
-                               if c.ModifiedAt.IsZero() {
-                                       return nil
-                               }
-                               if sawUUID[c.UUID] {
-                                       // dup
-                                       return nil
-                               }
-                               got[trial] = append(got[trial], c.UUID)
-                               sawUUID[c.UUID] = true
-                               if lastMod == c.ModifiedAt {
-                                       streak++
-                                       if streak > longestStreak {
-                                               longestStreak = streak
-                                       }
-                               } else {
-                                       streak = 0
-                                       lastMod = c.ModifiedAt
-                               }
-                               return nil
-                       }, nil)
-                       c.Check(err, check.IsNil)
-                       c.Check(longestStreak > 25, check.Equals, true)
-               }(trial, pageSize)
-       }
-       wg.Wait()
-       for trial := 1; trial < len(pageSizes); trial++ {
-               c.Check(got[trial], check.DeepEquals, got[0])
-       }
+// TestMissedCollections exercises EachCollection's sanity check:
+// #collections processed >= #old collections that exist in database
+// after processing.
+func (s *integrationSuite) TestMissedCollections(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+       c.Assert(err, check.IsNil)
+
+       defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`)
+       insertedOld := false
+       err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error {
+               if !insertedOld {
+                       insertedOld = true
+                       _, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`)
+                       return err
+               }
+               return nil
+       }, nil)
+       c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`)
 }
index e4297bfe6d353f00c741545747f1a4b520d5ba66..564e36a43e0c5a160f8a20b8ef687f77885f528f 100644 (file)
@@ -18,6 +18,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/jmoiron/sqlx"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
@@ -27,6 +28,7 @@ var _ = check.Suite(&integrationSuite{})
 
 type integrationSuite struct {
        config     *arvados.Cluster
+       db         *sqlx.DB
        client     *arvados.Client
        keepClient *keepclient.KeepClient
 }
@@ -68,6 +70,8 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
        c.Assert(err, check.Equals, nil)
        s.config, err = cfg.GetCluster("")
        c.Assert(err, check.Equals, nil)
+       s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+       c.Assert(err, check.IsNil)
        s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
 
        s.client = &arvados.Client{
@@ -90,6 +94,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                }
 
                bal := &Balancer{
+                       DB:      s.db,
                        Logger:  logger,
                        Metrics: newMetrics(prometheus.NewRegistry()),
                }
index 8b4ee84c716e4596987b1371b38035610f9ffa2f..80b1ed301f7c7b632734d34de57fc0daeea9b0e8 100644 (file)
@@ -16,6 +16,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -78,6 +79,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
@@ -89,6 +102,7 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
+                               DB:         db,
                        }
                        srv.Handler = &health.Handler{
                                Token:  cluster.ManagementToken,
index 9801a3fd45d5d13ec40bf661c59b4de5156cfeed..b42fa23a3d8980e3d277621e15901e444a2c02ca 100644 (file)
@@ -12,6 +12,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
 )
 
@@ -46,11 +47,13 @@ type Server struct {
 
        Logger logrus.FieldLogger
        Dumper logrus.FieldLogger
+
+       DB *sqlx.DB
 }
 
 // CheckHealth implements service.Handler.
 func (srv *Server) CheckHealth() error {
-       return nil
+       return srv.DB.Ping()
 }
 
 // Done implements service.Handler.
@@ -75,6 +78,7 @@ func (srv *Server) run() {
 
 func (srv *Server) runOnce() (*Balancer, error) {
        bal := &Balancer{
+               DB:             srv.DB,
                Logger:         srv.Logger,
                Dumper:         srv.Dumper,
                Metrics:        srv.Metrics,