Merge branch '11454-wb-federated-search'
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "fmt"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12 )
13
14 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
15         var page arvados.CollectionList
16         var zero int
17         params.Limit = &zero
18         params.Count = "exact"
19         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
20         return page.ItemsAvailable, err
21 }
22
23 // EachCollection calls f once for every readable
24 // collection. EachCollection stops if it encounters an error, such as
25 // f returning a non-nil error.
26 //
27 // The progress function is called periodically with done (number of
28 // times f has been called) and total (number of times f is expected
29 // to be called).
30 //
31 // If pageSize > 0 it is used as the maximum page size in each API
32 // call; otherwise the maximum allowed page size is requested.
33 func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
34         if progress == nil {
35                 progress = func(_, _ int) {}
36         }
37
38         expectCount, err := countCollections(c, arvados.ResourceListParams{
39                 IncludeTrash: true,
40         })
41         if err != nil {
42                 return err
43         }
44
45         limit := pageSize
46         if limit <= 0 {
47                 // Use the maximum page size the server allows
48                 limit = 1<<31 - 1
49         }
50         params := arvados.ResourceListParams{
51                 Limit:        &limit,
52                 Order:        "modified_at, uuid",
53                 Count:        "none",
54                 Select:       []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
55                 IncludeTrash: true,
56         }
57         var last arvados.Collection
58         var filterTime time.Time
59         callCount := 0
60         gettingExactTimestamp := false
61         for {
62                 progress(callCount, expectCount)
63                 var page arvados.CollectionList
64                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
65                 if err != nil {
66                         return err
67                 }
68                 for _, coll := range page.Items {
69                         if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
70                                 continue
71                         }
72                         callCount++
73                         err = f(coll)
74                         if err != nil {
75                                 return err
76                         }
77                         last = coll
78                 }
79                 if len(page.Items) == 0 && !gettingExactTimestamp {
80                         break
81                 } else if last.ModifiedAt == nil {
82                         return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
83                 } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
84                         // If we requested time>=X and never got a
85                         // time>X then we might not have received all
86                         // items with time==X yet. Switch to
87                         // gettingExactTimestamp mode (if we're not
88                         // there already), advancing our UUID
89                         // threshold with each request, until we get
90                         // an empty page.
91                         gettingExactTimestamp = true
92                         params.Filters = []arvados.Filter{{
93                                 Attr:     "modified_at",
94                                 Operator: "=",
95                                 Operand:  filterTime,
96                         }, {
97                                 Attr:     "uuid",
98                                 Operator: ">",
99                                 Operand:  last.UUID,
100                         }}
101                 } else if gettingExactTimestamp {
102                         // This must be an empty page (in this mode,
103                         // an unequal timestamp is impossible) so we
104                         // can start getting pages of newer
105                         // collections.
106                         gettingExactTimestamp = false
107                         params.Filters = []arvados.Filter{{
108                                 Attr:     "modified_at",
109                                 Operator: ">",
110                                 Operand:  filterTime,
111                         }}
112                 } else {
113                         // In the normal case, we know we have seen
114                         // all collections with modtime<filterTime,
115                         // but we might not have seen all that have
116                         // modtime=filterTime. Hence we use >= instead
117                         // of > and skip the obvious overlapping item,
118                         // i.e., the last item on the previous
119                         // page. In some edge cases this can return
120                         // collections we have already seen, but
121                         // avoiding that would add overhead in the
122                         // overwhelmingly common cases, so we don't
123                         // bother.
124                         filterTime = *last.ModifiedAt
125                         params.Filters = []arvados.Filter{{
126                                 Attr:     "modified_at",
127                                 Operator: ">=",
128                                 Operand:  filterTime,
129                         }, {
130                                 Attr:     "uuid",
131                                 Operator: "!=",
132                                 Operand:  last.UUID,
133                         }}
134                 }
135         }
136         progress(callCount, expectCount)
137
138         if checkCount, err := countCollections(c, arvados.ResourceListParams{
139                 Filters: []arvados.Filter{{
140                         Attr:     "modified_at",
141                         Operator: "<=",
142                         Operand:  filterTime}},
143                 IncludeTrash: true,
144         }); err != nil {
145                 return err
146         } else if callCount < checkCount {
147                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
148         }
149
150         return nil
151 }