7 "git.curoverse.com/arvados.git/sdk/go/arvados"
10 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
11 var page arvados.CollectionList
14 params.Count = "exact"
15 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
16 return page.ItemsAvailable, err
19 // EachCollection calls f once for every readable
20 // collection. EachCollection stops if it encounters an error, such as
21 // f returning a non-nil error.
23 // The progress function is called periodically with done (number of
24 // times f has been called) and total (number of times f is expected
27 // If pageSize > 0 it is used as the maximum page size in each API
28 // call; otherwise the maximum allowed page size is requested.
29 func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
31 progress = func(_, _ int) {}
34 expectCount, err := countCollections(c, arvados.ResourceListParams{
43 // Use the maximum page size the server allows
46 params := arvados.ResourceListParams{
48 Order: "modified_at, uuid",
50 Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
53 var last arvados.Collection
54 var filterTime time.Time
56 gettingExactTimestamp := false
58 progress(callCount, expectCount)
59 var page arvados.CollectionList
60 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
64 for _, coll := range page.Items {
65 if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
75 if len(page.Items) == 0 && !gettingExactTimestamp {
77 } else if last.ModifiedAt == nil {
78 return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
79 } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
80 // If we requested time>=X and never got a
81 // time>X then we might not have received all
82 // items with time==X yet. Switch to
83 // gettingExactTimestamp mode (if we're not
84 // there already), advancing our UUID
85 // threshold with each request, until we get
87 gettingExactTimestamp = true
88 params.Filters = []arvados.Filter{{
97 } else if gettingExactTimestamp {
98 // This must be an empty page (in this mode,
99 // an unequal timestamp is impossible) so we
100 // can start getting pages of newer
102 gettingExactTimestamp = false
103 params.Filters = []arvados.Filter{{
109 // In the normal case, we know we have seen
110 // all collections with modtime<filterTime,
111 // but we might not have seen all that have
112 // modtime=filterTime. Hence we use >= instead
113 // of > and skip the obvious overlapping item,
114 // i.e., the last item on the previous
115 // page. In some edge cases this can return
116 // collections we have already seen, but
117 // avoiding that would add overhead in the
118 // overwhelmingly common cases, so we don't
120 filterTime = *last.ModifiedAt
121 params.Filters = []arvados.Filter{{
132 progress(callCount, expectCount)
134 if checkCount, err := countCollections(c, arvados.ResourceListParams{
135 Filters: []arvados.Filter{{
138 Operand: filterTime}},
142 } else if callCount < checkCount {
143 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)