Merge branch '10816-postgres-permissions'
[arvados.git] / services / keep-balance / collection.go
1 package main
2
3 import (
4         "fmt"
5         "time"
6
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8 )
9
10 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
11         var page arvados.CollectionList
12         var zero int
13         params.Limit = &zero
14         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
15         return page.ItemsAvailable, err
16 }
17
18 // EachCollection calls f once for every readable
19 // collection. EachCollection stops if it encounters an error, such as
20 // f returning a non-nil error.
21 //
22 // The progress function is called periodically with done (number of
23 // times f has been called) and total (number of times f is expected
24 // to be called).
25 //
26 // If pageSize > 0 it is used as the maximum page size in each API
27 // call; otherwise the maximum allowed page size is requested.
28 func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
29         if progress == nil {
30                 progress = func(_, _ int) {}
31         }
32
33         expectCount, err := countCollections(c, arvados.ResourceListParams{
34                 IncludeTrash: true,
35         })
36         if err != nil {
37                 return err
38         }
39
40         limit := pageSize
41         if limit <= 0 {
42                 // Use the maximum page size the server allows
43                 limit = 1<<31 - 1
44         }
45         params := arvados.ResourceListParams{
46                 Limit:        &limit,
47                 Order:        "modified_at, uuid",
48                 Select:       []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
49                 IncludeTrash: true,
50         }
51         var last arvados.Collection
52         var filterTime time.Time
53         callCount := 0
54         for {
55                 progress(callCount, expectCount)
56                 var page arvados.CollectionList
57                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
58                 if err != nil {
59                         return err
60                 }
61                 for _, coll := range page.Items {
62                         if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
63                                 continue
64                         }
65                         callCount++
66                         err = f(coll)
67                         if err != nil {
68                                 return err
69                         }
70                         last = coll
71                 }
72                 if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
73                         if page.ItemsAvailable > len(page.Items) {
74                                 // TODO: use "mtime=X && UUID>Y"
75                                 // filters to get all collections with
76                                 // this timestamp, then use "mtime>X"
77                                 // to get the next timestamp.
78                                 return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
79                         }
80                         break
81                 }
82                 filterTime = *last.ModifiedAt
83                 params.Filters = []arvados.Filter{{
84                         Attr:     "modified_at",
85                         Operator: ">=",
86                         Operand:  filterTime,
87                 }, {
88                         Attr:     "uuid",
89                         Operator: "!=",
90                         Operand:  last.UUID,
91                 }}
92         }
93         progress(callCount, expectCount)
94
95         if checkCount, err := countCollections(c, arvados.ResourceListParams{
96                 Filters: []arvados.Filter{{
97                         Attr:     "modified_at",
98                         Operator: "<=",
99                         Operand:  filterTime}},
100                 IncludeTrash: true,
101         }); err != nil {
102                 return err
103         } else if callCount < checkCount {
104                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
105         }
106
107         return nil
108 }