16476: Merge branch 'master' into 16476-upgrade-arvados-jobs-to-buster
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "fmt"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13 )
14
15 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
16         var page arvados.CollectionList
17         var zero int
18         params.Limit = &zero
19         params.Count = "exact"
20         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
21         return page.ItemsAvailable, err
22 }
23
24 // EachCollection calls f once for every readable
25 // collection. EachCollection stops if it encounters an error, such as
26 // f returning a non-nil error.
27 //
28 // The progress function is called periodically with done (number of
29 // times f has been called) and total (number of times f is expected
30 // to be called).
31 //
32 // If pageSize > 0 it is used as the maximum page size in each API
33 // call; otherwise the maximum allowed page size is requested.
34 func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
35         if progress == nil {
36                 progress = func(_, _ int) {}
37         }
38
39         expectCount, err := countCollections(c, arvados.ResourceListParams{
40                 IncludeTrash:       true,
41                 IncludeOldVersions: true,
42         })
43         if err != nil {
44                 return err
45         }
46
47         // Note the obvious way to get all collections (sorting by
48         // UUID) would be much easier, but would lose data: If a
49         // client were to move files from collection with uuid="zzz"
50         // to a collection with uuid="aaa" around the time when we
51         // were fetching the "mmm" page, we would never see those
52         // files' block IDs at all -- even if the client is careful to
53         // save "aaa" before saving "zzz".
54         //
55         // Instead, we get pages in modified_at order. Collections
56         // that are modified during the run will be re-fetched in a
57         // subsequent page.
58
59         limit := pageSize
60         if limit <= 0 {
61                 // Use the maximum page size the server allows
62                 limit = 1<<31 - 1
63         }
64         params := arvados.ResourceListParams{
65                 Limit:              &limit,
66                 Order:              "modified_at, uuid",
67                 Count:              "none",
68                 Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
69                 IncludeTrash:       true,
70                 IncludeOldVersions: true,
71         }
72         var last arvados.Collection
73         var filterTime time.Time
74         callCount := 0
75         gettingExactTimestamp := false
76         for {
77                 progress(callCount, expectCount)
78                 var page arvados.CollectionList
79                 err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
80                 if err != nil {
81                         return err
82                 }
83                 for _, coll := range page.Items {
84                         if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
85                                 continue
86                         }
87                         callCount++
88                         err = f(coll)
89                         if err != nil {
90                                 return err
91                         }
92                         last = coll
93                 }
94                 if len(page.Items) == 0 && !gettingExactTimestamp {
95                         break
96                 } else if last.ModifiedAt.IsZero() {
97                         return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
98                 } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
99                         // If we requested time>=X and never got a
100                         // time>X then we might not have received all
101                         // items with time==X yet. Switch to
102                         // gettingExactTimestamp mode (if we're not
103                         // there already), advancing our UUID
104                         // threshold with each request, until we get
105                         // an empty page.
106                         gettingExactTimestamp = true
107                         params.Filters = []arvados.Filter{{
108                                 Attr:     "modified_at",
109                                 Operator: "=",
110                                 Operand:  filterTime,
111                         }, {
112                                 Attr:     "uuid",
113                                 Operator: ">",
114                                 Operand:  last.UUID,
115                         }}
116                 } else if gettingExactTimestamp {
117                         // This must be an empty page (in this mode,
118                         // an unequal timestamp is impossible) so we
119                         // can start getting pages of newer
120                         // collections.
121                         gettingExactTimestamp = false
122                         params.Filters = []arvados.Filter{{
123                                 Attr:     "modified_at",
124                                 Operator: ">",
125                                 Operand:  filterTime,
126                         }}
127                 } else {
128                         // In the normal case, we know we have seen
129                         // all collections with modtime<filterTime,
130                         // but we might not have seen all that have
131                         // modtime=filterTime. Hence we use >= instead
132                         // of > and skip the obvious overlapping item,
133                         // i.e., the last item on the previous
134                         // page. In some edge cases this can return
135                         // collections we have already seen, but
136                         // avoiding that would add overhead in the
137                         // overwhelmingly common cases, so we don't
138                         // bother.
139                         filterTime = last.ModifiedAt
140                         params.Filters = []arvados.Filter{{
141                                 Attr:     "modified_at",
142                                 Operator: ">=",
143                                 Operand:  filterTime,
144                         }, {
145                                 Attr:     "uuid",
146                                 Operator: "!=",
147                                 Operand:  last.UUID,
148                         }}
149                 }
150         }
151         progress(callCount, expectCount)
152
153         if checkCount, err := countCollections(c, arvados.ResourceListParams{
154                 Filters: []arvados.Filter{{
155                         Attr:     "modified_at",
156                         Operator: "<=",
157                         Operand:  filterTime}},
158                 IncludeTrash:       true,
159                 IncludeOldVersions: true,
160         }); err != nil {
161                 return err
162         } else if callCount < checkCount {
163                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
164         }
165
166         return nil
167 }