Merge branch 'master' into 9998-no-count-items-available
[arvados.git] / services / keep-balance / collection.go
1 package main
2
3 import (
4         "fmt"
5         "time"
6
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8 )
9
10 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
11         var page arvados.CollectionList
12         var zero int
13         params.Limit = &zero
14         params.Count = true
15         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
16         return page.ItemsAvailable, err
17 }
18
19 // EachCollection calls f once for every readable
20 // collection. EachCollection stops if it encounters an error, such as
21 // f returning a non-nil error.
22 //
23 // The progress function is called periodically with done (number of
24 // times f has been called) and total (number of times f is expected
25 // to be called).
26 //
27 // If pageSize > 0 it is used as the maximum page size in each API
28 // call; otherwise the maximum allowed page size is requested.
29 func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
30         if progress == nil {
31                 progress = func(_, _ int) {}
32         }
33
34         expectCount, err := countCollections(c, arvados.ResourceListParams{
35                 IncludeTrash: true,
36         })
37         if err != nil {
38                 return err
39         }
40
41         limit := pageSize
42         if limit <= 0 {
43                 // Use the maximum page size the server allows
44                 limit = 1<<31 - 1
45         }
46         params := arvados.ResourceListParams{
47                 Limit:        &limit,
48                 Order:        "modified_at, uuid",
49                 Count:        false,
50                 Select:       []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
51                 IncludeTrash: true,
52         }
53         var last arvados.Collection
54         var filterTime time.Time
55         callCount := 0
56         for {
57                 progress(callCount, expectCount)
58                 var page arvados.CollectionList
59                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
60                 if err != nil {
61                         return err
62                 }
63                 for _, coll := range page.Items {
64                         if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
65                                 continue
66                         }
67                         callCount++
68                         err = f(coll)
69                         if err != nil {
70                                 return err
71                         }
72                         last = coll
73                 }
74                 if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
75                         if page.ItemsAvailable > len(page.Items) {
76                                 // TODO: use "mtime=X && UUID>Y"
77                                 // filters to get all collections with
78                                 // this timestamp, then use "mtime>X"
79                                 // to get the next timestamp.
80                                 return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
81                         }
82                         break
83                 }
84                 filterTime = *last.ModifiedAt
85                 params.Filters = []arvados.Filter{{
86                         Attr:     "modified_at",
87                         Operator: ">=",
88                         Operand:  filterTime,
89                 }, {
90                         Attr:     "uuid",
91                         Operator: "!=",
92                         Operand:  last.UUID,
93                 }}
94         }
95         progress(callCount, expectCount)
96
97         if checkCount, err := countCollections(c, arvados.ResourceListParams{
98                 Filters: []arvados.Filter{{
99                         Attr:     "modified_at",
100                         Operator: "<=",
101                         Operand:  filterTime}},
102                 IncludeTrash: true,
103         }); err != nil {
104                 return err
105         } else if callCount < checkCount {
106                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
107         }
108
109         return nil
110 }