// The progress function is called periodically with done (number of
// times f has been called) and total (number of times f is expected
// to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
if err != nil {
return err
}
+ var newestModifiedAt time.Time
- // Note the obvious way to get all collections (sorting by
- // UUID) would be much easier, but would lose data: If a
- // client were to move files from collection with uuid="zzz"
- // to a collection with uuid="aaa" around the time when we
- // were fetching the "mmm" page, we would never see those
- // files' block IDs at all -- even if the client is careful to
- // save "aaa" before saving "zzz".
- //
- // Instead, we get pages in modified_at order. Collections
- // that are modified during the run will be re-fetched in a
- // subsequent page.
-
- limit := pageSize
- if limit <= 0 {
- // Use the maximum page size the server allows
- limit = 1<<31 - 1
- }
- params := arvados.ResourceListParams{
- Limit: &limit,
- Order: "modified_at, uuid",
- Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
- IncludeTrash: true,
- IncludeOldVersions: true,
+ rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+ if err != nil {
+ return err
}
- var last arvados.Collection
- var filterTime time.Time
callCount := 0
- gettingExactTimestamp := false
- for {
- progress(callCount, expectCount)
- var page arvados.CollectionList
- err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+ for rows.Next() {
+ var coll arvados.Collection
+ var classesDesired []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
if err != nil {
+ rows.Close()
return err
}
- for _, coll := range page.Items {
- if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
- continue
- }
- callCount++
- err = f(coll)
- if err != nil {
- return err
- }
- last = coll
+ err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+ if err != nil {
+ rows.Close()
+ return err
}
- if len(page.Items) == 0 && !gettingExactTimestamp {
- break
- } else if last.ModifiedAt.IsZero() {
- return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
- } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
- // If we requested time>=X and never got a
- // time>X then we might not have received all
- // items with time==X yet. Switch to
- // gettingExactTimestamp mode (if we're not
- // there already), advancing our UUID
- // threshold with each request, until we get
- // an empty page.
- gettingExactTimestamp = true
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: "=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: ">",
- Operand: last.UUID,
- }}
- } else if gettingExactTimestamp {
- // This must be an empty page (in this mode,
- // an unequal timestamp is impossible) so we
- // can start getting pages of newer
- // collections.
- gettingExactTimestamp = false
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">",
- Operand: filterTime,
- }}
- } else {
- // In the normal case, we know we have seen
- // all collections with modtime<filterTime,
- // but we might not have seen all that have
- // modtime=filterTime. Hence we use >= instead
- // of > and skip the obvious overlapping item,
- // i.e., the last item on the previous
- // page. In some edge cases this can return
- // collections we have already seen, but
- // avoiding that would add overhead in the
- // overwhelmingly common cases, so we don't
- // bother.
- filterTime = last.ModifiedAt
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: "!=",
- Operand: last.UUID,
- }}
+ if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+ newestModifiedAt = coll.ModifiedAt
}
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ progress(callCount, expectCount)
+ }
+ rows.Close()
+ if err := rows.Err(); err != nil {
+ return err
}
- progress(callCount, expectCount)
-
if checkCount, err := countCollections(c, arvados.ResourceListParams{
Filters: []arvados.Filter{{
Attr: "modified_at",
Operator: "<=",
- Operand: filterTime}},
+ Operand: newestModifiedAt}},
IncludeTrash: true,
IncludeOldVersions: true,
}); err != nil {
return err
} else if callCount < checkCount {
- return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
}
return nil
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
- err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+ err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
if coll.ModifiedAt.After(threshold) {
return io.EOF
}
}
}()
- db, err := bal.db(cluster)
- if err != nil {
- return err
- }
-
var updated int64
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
- tx, err := db.Beginx()
+ classes, err := json.Marshal(coll.StorageClassesDesired)
if err != nil {
- bal.logf("error opening transaction: %s", coll.UUID, err)
- cancel()
+ bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
continue
}
- classes, _ := json.Marshal(coll.StorageClassesDesired)
- _, err = tx.ExecContext(ctx, `update collections set
+ _, err = bal.DB.ExecContext(ctx, `update collections set
replication_confirmed=$1,
replication_confirmed_at=$2,
storage_classes_confirmed=$3,
storage_classes_confirmed_at=$2
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
- if err != nil {
- tx.Rollback()
- } else {
- err = tx.Commit()
- }
if err != nil {
bal.logf("%s: update failed: %s", coll.UUID, err)
continue
bal.logf("updated %d collections", updated)
return err
}
-
-func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
- db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
- if err != nil {
- return nil, err
- }
- if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
- db.SetMaxOpenConns(p)
- }
- if err := db.Ping(); err != nil {
- return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
- }
- return db, nil
-}
import (
"context"
- "sync"
- "time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
check "gopkg.in/check.v1"
)
-// TestIdenticalTimestamps ensures EachCollection returns the same
-// set of collections for various page sizes -- even page sizes so
-// small that we get entire pages full of collections with identical
-// timestamps and exercise our gettingExactTimestamp cases.
-func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
- // pageSize==0 uses the default (large) page size.
- pageSizes := []int{0, 2, 3, 4, 5}
- got := make([][]string, len(pageSizes))
- var wg sync.WaitGroup
- for trial, pageSize := range pageSizes {
- wg.Add(1)
- go func(trial, pageSize int) {
- defer wg.Done()
- streak := 0
- longestStreak := 0
- var lastMod time.Time
- sawUUID := make(map[string]bool)
- err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
- if c.ModifiedAt.IsZero() {
- return nil
- }
- if sawUUID[c.UUID] {
- // dup
- return nil
- }
- got[trial] = append(got[trial], c.UUID)
- sawUUID[c.UUID] = true
- if lastMod == c.ModifiedAt {
- streak++
- if streak > longestStreak {
- longestStreak = streak
- }
- } else {
- streak = 0
- lastMod = c.ModifiedAt
- }
- return nil
- }, nil)
- c.Check(err, check.IsNil)
- c.Check(longestStreak > 25, check.Equals, true)
- }(trial, pageSize)
- }
- wg.Wait()
- for trial := 1; trial < len(pageSizes); trial++ {
- c.Check(got[trial], check.DeepEquals, got[0])
- }
+// TestMissedCollections exercises EachCollection's sanity check:
+// #collections processed >= #old collections that exist in database
+// after processing.
+func (s *integrationSuite) TestMissedCollections(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
+
+ defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`)
+ insertedOld := false
+ err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error {
+ if !insertedOld {
+ insertedOld = true
+ _, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`)
+ return err
+ }
+ return nil
+ }, nil)
+ c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`)
}