From 325ba452bdb2b8ebe4ef2a85d495291429df8082 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 27 Jul 2021 11:16:49 -0400 Subject: [PATCH] 17574: Get collections directly from DB instead of controller. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keep-balance/balance.go | 4 +- services/keep-balance/collection.go | 166 +++++----------------- services/keep-balance/collection_test.go | 73 ++++------ services/keep-balance/integration_test.go | 5 + services/keep-balance/main.go | 14 ++ services/keep-balance/server.go | 6 +- 6 files changed, 84 insertions(+), 184 deletions(-) diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index a7dcf61902..6a71cf99f7 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -23,6 +23,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -36,6 +37,7 @@ import ( // BlobSignatureTTL; and all N existing replicas of a given data block // are in the N best positions in rendezvous probe order. type Balancer struct { + DB *sqlx.DB Logger logrus.FieldLogger Dumper logrus.FieldLogger Metrics *metrics @@ -424,7 +426,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag wg.Add(1) go func() { defer wg.Done() - err = EachCollection(ctx, c, pageSize, + err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go index 3afb1ccc55..ba9edf9391 100644 --- a/services/keep-balance/collection.go +++ b/services/keep-balance/collection.go @@ -34,10 +34,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int // The progress function is called periodically with done (number of // times f has been called) and total (number of times f is expected // to be called). -// -// If pageSize > 0 it is used as the maximum page size in each API -// call; otherwise the maximum allowed page size is requested. -func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error { +func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error { if progress == nil { progress = func(_, _ int) {} } @@ -49,124 +46,51 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func if err != nil { return err } + var newestModifiedAt time.Time - // Note the obvious way to get all collections (sorting by - // UUID) would be much easier, but would lose data: If a - // client were to move files from collection with uuid="zzz" - // to a collection with uuid="aaa" around the time when we - // were fetching the "mmm" page, we would never see those - // files' block IDs at all -- even if the client is careful to - // save "aaa" before saving "zzz". - // - // Instead, we get pages in modified_at order. Collections - // that are modified during the run will be re-fetched in a - // subsequent page. - - limit := pageSize - if limit <= 0 { - // Use the maximum page size the server allows - limit = 1<<31 - 1 - } - params := arvados.ResourceListParams{ - Limit: &limit, - Order: "modified_at, uuid", - Count: "none", - Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"}, - IncludeTrash: true, - IncludeOldVersions: true, + rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`) + if err != nil { + return err } - var last arvados.Collection - var filterTime time.Time callCount := 0 - gettingExactTimestamp := false - for { - progress(callCount, expectCount) - var page arvados.CollectionList - err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params) + for rows.Next() { + var coll arvados.Collection + var classesDesired []byte + err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed) if err != nil { + rows.Close() return err } - for _, coll := range page.Items { - if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID { - continue - } - callCount++ - err = f(coll) - if err != nil { - return err - } - last = coll + err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired) + if err != nil { + rows.Close() + return err } - if len(page.Items) == 0 && !gettingExactTimestamp { - break - } else if last.ModifiedAt.IsZero() { - return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID) - } else if len(page.Items) > 0 && last.ModifiedAt == filterTime { - // If we requested time>=X and never got a - // time>X then we might not have received all - // items with time==X yet. Switch to - // gettingExactTimestamp mode (if we're not - // there already), advancing our UUID - // threshold with each request, until we get - // an empty page. - gettingExactTimestamp = true - params.Filters = []arvados.Filter{{ - Attr: "modified_at", - Operator: "=", - Operand: filterTime, - }, { - Attr: "uuid", - Operator: ">", - Operand: last.UUID, - }} - } else if gettingExactTimestamp { - // This must be an empty page (in this mode, - // an unequal timestamp is impossible) so we - // can start getting pages of newer - // collections. - gettingExactTimestamp = false - params.Filters = []arvados.Filter{{ - Attr: "modified_at", - Operator: ">", - Operand: filterTime, - }} - } else { - // In the normal case, we know we have seen - // all collections with modtime= instead - // of > and skip the obvious overlapping item, - // i.e., the last item on the previous - // page. In some edge cases this can return - // collections we have already seen, but - // avoiding that would add overhead in the - // overwhelmingly common cases, so we don't - // bother. - filterTime = last.ModifiedAt - params.Filters = []arvados.Filter{{ - Attr: "modified_at", - Operator: ">=", - Operand: filterTime, - }, { - Attr: "uuid", - Operator: "!=", - Operand: last.UUID, - }} + if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) { + newestModifiedAt = coll.ModifiedAt } + callCount++ + err = f(coll) + if err != nil { + return err + } + progress(callCount, expectCount) + } + rows.Close() + if err := rows.Err(); err != nil { + return err } - progress(callCount, expectCount) - if checkCount, err := countCollections(c, arvados.ResourceListParams{ Filters: []arvados.Filter{{ Attr: "modified_at", Operator: "<=", - Operand: filterTime}}, + Operand: newestModifiedAt}}, IncludeTrash: true, IncludeOldVersions: true, }); err != nil { return err } else if callCount < checkCount { - return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount) + return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount) } return nil @@ -184,7 +108,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers) go func() { defer close(collQ) - err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error { + err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { if coll.ModifiedAt.After(threshold) { return io.EOF } @@ -203,11 +127,6 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c } }() - db, err := bal.db(cluster) - if err != nil { - return err - } - var updated int64 var wg sync.WaitGroup for i := 0; i < runtime.NumCPU(); i++ { @@ -221,25 +140,18 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c continue } repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired) - tx, err := db.Beginx() + classes, err := json.Marshal(coll.StorageClassesDesired) if err != nil { - bal.logf("error opening transaction: %s", coll.UUID, err) - cancel() + bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err) continue } - classes, _ := json.Marshal(coll.StorageClassesDesired) - _, err = tx.ExecContext(ctx, `update collections set + _, err = bal.DB.ExecContext(ctx, `update collections set replication_confirmed=$1, replication_confirmed_at=$2, storage_classes_confirmed=$3, storage_classes_confirmed_at=$2 where uuid=$4`, repl, thresholdStr, classes, coll.UUID) - if err != nil { - tx.Rollback() - } else { - err = tx.Commit() - } if err != nil { bal.logf("%s: update failed: %s", coll.UUID, err) continue @@ -252,17 +164,3 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c bal.logf("updated %d collections", updated) return err } - -func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) { - db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) - if err != nil { - return nil, err - } - if p := cluster.PostgreSQL.ConnectionPool; p > 0 { - db.SetMaxOpenConns(p) - } - if err := db.Ping(); err != nil { - return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err) - } - return db, nil -} diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go index 3ab9d07b2e..f749bad6ad 100644 --- a/services/keep-balance/collection_test.go +++ b/services/keep-balance/collection_test.go @@ -6,57 +6,34 @@ package main import ( "context" - "sync" - "time" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/jmoiron/sqlx" check "gopkg.in/check.v1" ) -// TestIdenticalTimestamps ensures EachCollection returns the same -// set of collections for various page sizes -- even page sizes so -// small that we get entire pages full of collections with identical -// timestamps and exercise our gettingExactTimestamp cases. -func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) { - // pageSize==0 uses the default (large) page size. - pageSizes := []int{0, 2, 3, 4, 5} - got := make([][]string, len(pageSizes)) - var wg sync.WaitGroup - for trial, pageSize := range pageSizes { - wg.Add(1) - go func(trial, pageSize int) { - defer wg.Done() - streak := 0 - longestStreak := 0 - var lastMod time.Time - sawUUID := make(map[string]bool) - err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error { - if c.ModifiedAt.IsZero() { - return nil - } - if sawUUID[c.UUID] { - // dup - return nil - } - got[trial] = append(got[trial], c.UUID) - sawUUID[c.UUID] = true - if lastMod == c.ModifiedAt { - streak++ - if streak > longestStreak { - longestStreak = streak - } - } else { - streak = 0 - lastMod = c.ModifiedAt - } - return nil - }, nil) - c.Check(err, check.IsNil) - c.Check(longestStreak > 25, check.Equals, true) - }(trial, pageSize) - } - wg.Wait() - for trial := 1; trial < len(pageSizes); trial++ { - c.Check(got[trial], check.DeepEquals, got[0]) - } +// TestMissedCollections exercises EachCollection's sanity check: +// #collections processed >= #old collections that exist in database +// after processing. +func (s *integrationSuite) TestMissedCollections(c *check.C) { + cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() + c.Assert(err, check.IsNil) + cluster, err := cfg.GetCluster("") + c.Assert(err, check.IsNil) + db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) + c.Assert(err, check.IsNil) + + defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`) + insertedOld := false + err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error { + if !insertedOld { + insertedOld = true + _, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`) + return err + } + return nil + }, nil) + c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`) } diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go index e4297bfe6d..564e36a43e 100644 --- a/services/keep-balance/integration_test.go +++ b/services/keep-balance/integration_test.go @@ -18,6 +18,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadostest" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" check "gopkg.in/check.v1" @@ -27,6 +28,7 @@ var _ = check.Suite(&integrationSuite{}) type integrationSuite struct { config *arvados.Cluster + db *sqlx.DB client *arvados.Client keepClient *keepclient.KeepClient } @@ -68,6 +70,8 @@ func (s *integrationSuite) SetUpTest(c *check.C) { c.Assert(err, check.Equals, nil) s.config, err = cfg.GetCluster("") c.Assert(err, check.Equals, nil) + s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String()) + c.Assert(err, check.IsNil) s.config.Collections.BalancePeriod = arvados.Duration(time.Second) s.client = &arvados.Client{ @@ -90,6 +94,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) { } bal := &Balancer{ + DB: s.db, Logger: logger, Metrics: newMetrics(prometheus.NewRegistry()), } diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go index 8b4ee84c71..80b1ed301f 100644 --- a/services/keep-balance/main.go +++ b/services/keep-balance/main.go @@ -16,6 +16,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" + "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -78,6 +79,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err)) } + db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err)) + } + if p := cluster.PostgreSQL.ConnectionPool; p > 0 { + db.SetMaxOpenConns(p) + } + err = db.Ping() + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err)) + } + if options.Logger == nil { options.Logger = ctxlog.FromContext(ctx) } @@ -89,6 +102,7 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W Metrics: newMetrics(registry), Logger: options.Logger, Dumper: options.Dumper, + DB: db, } srv.Handler = &health.Handler{ Token: cluster.ManagementToken, diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index 9801a3fd45..b42fa23a3d 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -12,6 +12,7 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -46,11 +47,13 @@ type Server struct { Logger logrus.FieldLogger Dumper logrus.FieldLogger + + DB *sqlx.DB } // CheckHealth implements service.Handler. func (srv *Server) CheckHealth() error { - return nil + return srv.DB.Ping() } // Done implements service.Handler. @@ -75,6 +78,7 @@ func (srv *Server) run() { func (srv *Server) runOnce() (*Balancer, error) { bal := &Balancer{ + DB: srv.DB, Logger: srv.Logger, Dumper: srv.Dumper, Metrics: srv.Metrics, -- 2.30.2