11283: Fix "available slot number" query.
[arvados.git] / services / keep-balance / collection.go
1 package main
2
3 import (
4         "fmt"
5         "time"
6
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8 )
9
10 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
11         var page arvados.CollectionList
12         var zero int
13         params.Limit = &zero
14         params.Count = "exact"
15         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
16         return page.ItemsAvailable, err
17 }
18
19 // EachCollection calls f once for every readable
20 // collection. EachCollection stops if it encounters an error, such as
21 // f returning a non-nil error.
22 //
23 // The progress function is called periodically with done (number of
24 // times f has been called) and total (number of times f is expected
25 // to be called).
26 //
27 // If pageSize > 0 it is used as the maximum page size in each API
28 // call; otherwise the maximum allowed page size is requested.
29 func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
30         if progress == nil {
31                 progress = func(_, _ int) {}
32         }
33
34         expectCount, err := countCollections(c, arvados.ResourceListParams{
35                 IncludeTrash: true,
36         })
37         if err != nil {
38                 return err
39         }
40
41         limit := pageSize
42         if limit <= 0 {
43                 // Use the maximum page size the server allows
44                 limit = 1<<31 - 1
45         }
46         params := arvados.ResourceListParams{
47                 Limit:        &limit,
48                 Order:        "modified_at, uuid",
49                 Count:        "none",
50                 Select:       []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
51                 IncludeTrash: true,
52         }
53         var last arvados.Collection
54         var filterTime time.Time
55         callCount := 0
56         gettingExactTimestamp := false
57         for {
58                 progress(callCount, expectCount)
59                 var page arvados.CollectionList
60                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
61                 if err != nil {
62                         return err
63                 }
64                 for _, coll := range page.Items {
65                         if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
66                                 continue
67                         }
68                         callCount++
69                         err = f(coll)
70                         if err != nil {
71                                 return err
72                         }
73                         last = coll
74                 }
75                 if len(page.Items) == 0 && !gettingExactTimestamp {
76                         break
77                 } else if last.ModifiedAt == nil {
78                         return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
79                 } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
80                         // If we requested time>=X and never got a
81                         // time>X then we might not have received all
82                         // items with time==X yet. Switch to
83                         // gettingExactTimestamp mode (if we're not
84                         // there already), advancing our UUID
85                         // threshold with each request, until we get
86                         // an empty page.
87                         gettingExactTimestamp = true
88                         params.Filters = []arvados.Filter{{
89                                 Attr:     "modified_at",
90                                 Operator: "=",
91                                 Operand:  filterTime,
92                         }, {
93                                 Attr:     "uuid",
94                                 Operator: ">",
95                                 Operand:  last.UUID,
96                         }}
97                 } else if gettingExactTimestamp {
98                         // This must be an empty page (in this mode,
99                         // an unequal timestamp is impossible) so we
100                         // can start getting pages of newer
101                         // collections.
102                         gettingExactTimestamp = false
103                         params.Filters = []arvados.Filter{{
104                                 Attr:     "modified_at",
105                                 Operator: ">",
106                                 Operand:  filterTime,
107                         }}
108                 } else {
109                         // In the normal case, we know we have seen
110                         // all collections with modtime<filterTime,
111                         // but we might not have seen all that have
112                         // modtime=filterTime. Hence we use >= instead
113                         // of > and skip the obvious overlapping item,
114                         // i.e., the last item on the previous
115                         // page. In some edge cases this can return
116                         // collections we have already seen, but
117                         // avoiding that would add overhead in the
118                         // overwhelmingly common cases, so we don't
119                         // bother.
120                         filterTime = *last.ModifiedAt
121                         params.Filters = []arvados.Filter{{
122                                 Attr:     "modified_at",
123                                 Operator: ">=",
124                                 Operand:  filterTime,
125                         }, {
126                                 Attr:     "uuid",
127                                 Operator: "!=",
128                                 Operand:  last.UUID,
129                         }}
130                 }
131         }
132         progress(callCount, expectCount)
133
134         if checkCount, err := countCollections(c, arvados.ResourceListParams{
135                 Filters: []arvados.Filter{{
136                         Attr:     "modified_at",
137                         Operator: "<=",
138                         Operand:  filterTime}},
139                 IncludeTrash: true,
140         }); err != nil {
141                 return err
142         } else if callCount < checkCount {
143                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
144         }
145
146         return nil
147 }