1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
15 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
16 var page arvados.CollectionList
19 params.Count = "exact"
20 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
21 return page.ItemsAvailable, err
24 // EachCollection calls f once for every readable
25 // collection. EachCollection stops if it encounters an error, such as
26 // f returning a non-nil error.
28 // The progress function is called periodically with done (number of
29 // times f has been called) and total (number of times f is expected
32 // If pageSize > 0 it is used as the maximum page size in each API
33 // call; otherwise the maximum allowed page size is requested.
34 func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
36 progress = func(_, _ int) {}
39 expectCount, err := countCollections(c, arvados.ResourceListParams{
41 IncludeOldVersions: true,
47 // Note the obvious way to get all collections (sorting by
48 // UUID) would be much easier, but would lose data: If a
49 // client were to move files from collection with uuid="zzz"
50 // to a collection with uuid="aaa" around the time when we
51 // were fetching the "mmm" page, we would never see those
52 // files' block IDs at all -- even if the client is careful to
53 // save "aaa" before saving "zzz".
55 // Instead, we get pages in modified_at order. Collections
56 // that are modified during the run will be re-fetched in a
61 // Use the maximum page size the server allows
64 params := arvados.ResourceListParams{
66 Order: "modified_at, uuid",
68 Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
70 IncludeOldVersions: true,
72 var last arvados.Collection
73 var filterTime time.Time
75 gettingExactTimestamp := false
77 progress(callCount, expectCount)
78 var page arvados.CollectionList
79 err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
83 for _, coll := range page.Items {
84 if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
94 if len(page.Items) == 0 && !gettingExactTimestamp {
96 } else if last.ModifiedAt.IsZero() {
97 return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
98 } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
99 // If we requested time>=X and never got a
100 // time>X then we might not have received all
101 // items with time==X yet. Switch to
102 // gettingExactTimestamp mode (if we're not
103 // there already), advancing our UUID
104 // threshold with each request, until we get
106 gettingExactTimestamp = true
107 params.Filters = []arvados.Filter{{
116 } else if gettingExactTimestamp {
117 // This must be an empty page (in this mode,
118 // an unequal timestamp is impossible) so we
119 // can start getting pages of newer
121 gettingExactTimestamp = false
122 params.Filters = []arvados.Filter{{
128 // In the normal case, we know we have seen
129 // all collections with modtime<filterTime,
130 // but we might not have seen all that have
131 // modtime=filterTime. Hence we use >= instead
132 // of > and skip the obvious overlapping item,
133 // i.e., the last item on the previous
134 // page. In some edge cases this can return
135 // collections we have already seen, but
136 // avoiding that would add overhead in the
137 // overwhelmingly common cases, so we don't
139 filterTime = last.ModifiedAt
140 params.Filters = []arvados.Filter{{
151 progress(callCount, expectCount)
153 if checkCount, err := countCollections(c, arvados.ResourceListParams{
154 Filters: []arvados.Filter{{
157 Operand: filterTime}},
159 IncludeOldVersions: true,
162 } else if callCount < checkCount {
163 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)