Arvados-DCO-1.1-Signed-off-by: Radhika Chippada <radhika@curoverse.com>
[arvados.git] / services / keep-balance / collection.go
index f4fc72152f1a68ffbfdb9683f134dbcbcb46f2da..8f4ebb6bdfa277b33deeaf6cf2c0e2f2ecade076 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -11,6 +15,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
        var page arvados.CollectionList
        var zero int
        params.Limit = &zero
+       params.Count = "exact"
        err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
        return page.ItemsAvailable, err
 }
@@ -30,7 +35,9 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
                progress = func(_, _ int) {}
        }
 
-       expectCount, err := countCollections(c, arvados.ResourceListParams{})
+       expectCount, err := countCollections(c, arvados.ResourceListParams{
+               IncludeTrash: true,
+       })
        if err != nil {
                return err
        }
@@ -41,13 +48,16 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
                limit = 1<<31 - 1
        }
        params := arvados.ResourceListParams{
-               Limit:  &limit,
-               Order:  "modified_at, uuid",
-               Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+               Limit:        &limit,
+               Order:        "modified_at, uuid",
+               Count:        "none",
+               Select:       []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+               IncludeTrash: true,
        }
        var last arvados.Collection
        var filterTime time.Time
        callCount := 0
+       gettingExactTimestamp := false
        for {
                progress(callCount, expectCount)
                var page arvados.CollectionList
@@ -66,33 +76,72 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
                        }
                        last = coll
                }
-               if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
-                       if page.ItemsAvailable > len(page.Items) {
-                               // TODO: use "mtime=X && UUID>Y"
-                               // filters to get all collections with
-                               // this timestamp, then use "mtime>X"
-                               // to get the next timestamp.
-                               return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
-                       }
+               if len(page.Items) == 0 && !gettingExactTimestamp {
                        break
+               } else if last.ModifiedAt == nil {
+                       return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
+               } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
+                       // If we requested time>=X and never got a
+                       // time>X then we might not have received all
+                       // items with time==X yet. Switch to
+                       // gettingExactTimestamp mode (if we're not
+                       // there already), advancing our UUID
+                       // threshold with each request, until we get
+                       // an empty page.
+                       gettingExactTimestamp = true
+                       params.Filters = []arvados.Filter{{
+                               Attr:     "modified_at",
+                               Operator: "=",
+                               Operand:  filterTime,
+                       }, {
+                               Attr:     "uuid",
+                               Operator: ">",
+                               Operand:  last.UUID,
+                       }}
+               } else if gettingExactTimestamp {
+                       // This must be an empty page (in this mode,
+                       // an unequal timestamp is impossible) so we
+                       // can start getting pages of newer
+                       // collections.
+                       gettingExactTimestamp = false
+                       params.Filters = []arvados.Filter{{
+                               Attr:     "modified_at",
+                               Operator: ">",
+                               Operand:  filterTime,
+                       }}
+               } else {
+                       // In the normal case, we know we have seen
+                       // all collections with modtime<filterTime,
+                       // but we might not have seen all that have
+                       // modtime=filterTime. Hence we use >= instead
+                       // of > and skip the obvious overlapping item,
+                       // i.e., the last item on the previous
+                       // page. In some edge cases this can return
+                       // collections we have already seen, but
+                       // avoiding that would add overhead in the
+                       // overwhelmingly common cases, so we don't
+                       // bother.
+                       filterTime = *last.ModifiedAt
+                       params.Filters = []arvados.Filter{{
+                               Attr:     "modified_at",
+                               Operator: ">=",
+                               Operand:  filterTime,
+                       }, {
+                               Attr:     "uuid",
+                               Operator: "!=",
+                               Operand:  last.UUID,
+                       }}
                }
-               filterTime = *last.ModifiedAt
-               params.Filters = []arvados.Filter{{
-                       Attr:     "modified_at",
-                       Operator: ">=",
-                       Operand:  filterTime,
-               }, {
-                       Attr:     "uuid",
-                       Operator: "!=",
-                       Operand:  last.UUID,
-               }}
        }
        progress(callCount, expectCount)
 
-       if checkCount, err := countCollections(c, arvados.ResourceListParams{Filters: []arvados.Filter{{
-               Attr:     "modified_at",
-               Operator: "<=",
-               Operand:  filterTime}}}); err != nil {
+       if checkCount, err := countCollections(c, arvados.ResourceListParams{
+               Filters: []arvados.Filter{{
+                       Attr:     "modified_at",
+                       Operator: "<=",
+                       Operand:  filterTime}},
+               IncludeTrash: true,
+       }); err != nil {
                return err
        } else if callCount < checkCount {
                return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)