Merge branch '16492-websocket-extensions-gem-upgrade'
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "fmt"
9         "time"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12 )
13
14 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
15         var page arvados.CollectionList
16         var zero int
17         params.Limit = &zero
18         params.Count = "exact"
19         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
20         return page.ItemsAvailable, err
21 }
22
23 // EachCollection calls f once for every readable
24 // collection. EachCollection stops if it encounters an error, such as
25 // f returning a non-nil error.
26 //
27 // The progress function is called periodically with done (number of
28 // times f has been called) and total (number of times f is expected
29 // to be called).
30 //
31 // If pageSize > 0 it is used as the maximum page size in each API
32 // call; otherwise the maximum allowed page size is requested.
33 func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
34         if progress == nil {
35                 progress = func(_, _ int) {}
36         }
37
38         expectCount, err := countCollections(c, arvados.ResourceListParams{
39                 IncludeTrash:       true,
40                 IncludeOldVersions: true,
41         })
42         if err != nil {
43                 return err
44         }
45
46         // Note the obvious way to get all collections (sorting by
47         // UUID) would be much easier, but would lose data: If a
48         // client were to move files from collection with uuid="zzz"
49         // to a collection with uuid="aaa" around the time when we
50         // were fetching the "mmm" page, we would never see those
51         // files' block IDs at all -- even if the client is careful to
52         // save "aaa" before saving "zzz".
53         //
54         // Instead, we get pages in modified_at order. Collections
55         // that are modified during the run will be re-fetched in a
56         // subsequent page.
57
58         limit := pageSize
59         if limit <= 0 {
60                 // Use the maximum page size the server allows
61                 limit = 1<<31 - 1
62         }
63         params := arvados.ResourceListParams{
64                 Limit:              &limit,
65                 Order:              "modified_at, uuid",
66                 Count:              "none",
67                 Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
68                 IncludeTrash:       true,
69                 IncludeOldVersions: true,
70         }
71         var last arvados.Collection
72         var filterTime time.Time
73         callCount := 0
74         gettingExactTimestamp := false
75         for {
76                 progress(callCount, expectCount)
77                 var page arvados.CollectionList
78                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
79                 if err != nil {
80                         return err
81                 }
82                 for _, coll := range page.Items {
83                         if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
84                                 continue
85                         }
86                         callCount++
87                         err = f(coll)
88                         if err != nil {
89                                 return err
90                         }
91                         last = coll
92                 }
93                 if len(page.Items) == 0 && !gettingExactTimestamp {
94                         break
95                 } else if last.ModifiedAt.IsZero() {
96                         return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
97                 } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
98                         // If we requested time>=X and never got a
99                         // time>X then we might not have received all
100                         // items with time==X yet. Switch to
101                         // gettingExactTimestamp mode (if we're not
102                         // there already), advancing our UUID
103                         // threshold with each request, until we get
104                         // an empty page.
105                         gettingExactTimestamp = true
106                         params.Filters = []arvados.Filter{{
107                                 Attr:     "modified_at",
108                                 Operator: "=",
109                                 Operand:  filterTime,
110                         }, {
111                                 Attr:     "uuid",
112                                 Operator: ">",
113                                 Operand:  last.UUID,
114                         }}
115                 } else if gettingExactTimestamp {
116                         // This must be an empty page (in this mode,
117                         // an unequal timestamp is impossible) so we
118                         // can start getting pages of newer
119                         // collections.
120                         gettingExactTimestamp = false
121                         params.Filters = []arvados.Filter{{
122                                 Attr:     "modified_at",
123                                 Operator: ">",
124                                 Operand:  filterTime,
125                         }}
126                 } else {
127                         // In the normal case, we know we have seen
128                         // all collections with modtime<filterTime,
129                         // but we might not have seen all that have
130                         // modtime=filterTime. Hence we use >= instead
131                         // of > and skip the obvious overlapping item,
132                         // i.e., the last item on the previous
133                         // page. In some edge cases this can return
134                         // collections we have already seen, but
135                         // avoiding that would add overhead in the
136                         // overwhelmingly common cases, so we don't
137                         // bother.
138                         filterTime = last.ModifiedAt
139                         params.Filters = []arvados.Filter{{
140                                 Attr:     "modified_at",
141                                 Operator: ">=",
142                                 Operand:  filterTime,
143                         }, {
144                                 Attr:     "uuid",
145                                 Operator: "!=",
146                                 Operand:  last.UUID,
147                         }}
148                 }
149         }
150         progress(callCount, expectCount)
151
152         if checkCount, err := countCollections(c, arvados.ResourceListParams{
153                 Filters: []arvados.Filter{{
154                         Attr:     "modified_at",
155                         Operator: "<=",
156                         Operand:  filterTime}},
157                 IncludeTrash:       true,
158                 IncludeOldVersions: true,
159         }); err != nil {
160                 return err
161         } else if callCount < checkCount {
162                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
163         }
164
165         return nil
166 }