17574: Update replication_confirmed fields after keep-balance run.
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "runtime"
13         "sync"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "github.com/jmoiron/sqlx"
19 )
20
21 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
22         var page arvados.CollectionList
23         var zero int
24         params.Limit = &zero
25         params.Count = "exact"
26         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
27         return page.ItemsAvailable, err
28 }
29
30 // EachCollection calls f once for every readable
31 // collection. EachCollection stops if it encounters an error, such as
32 // f returning a non-nil error.
33 //
34 // The progress function is called periodically with done (number of
35 // times f has been called) and total (number of times f is expected
36 // to be called).
37 //
38 // If pageSize > 0 it is used as the maximum page size in each API
39 // call; otherwise the maximum allowed page size is requested.
40 func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
41         if progress == nil {
42                 progress = func(_, _ int) {}
43         }
44
45         expectCount, err := countCollections(c, arvados.ResourceListParams{
46                 IncludeTrash:       true,
47                 IncludeOldVersions: true,
48         })
49         if err != nil {
50                 return err
51         }
52
53         // Note the obvious way to get all collections (sorting by
54         // UUID) would be much easier, but would lose data: If a
55         // client were to move files from collection with uuid="zzz"
56         // to a collection with uuid="aaa" around the time when we
57         // were fetching the "mmm" page, we would never see those
58         // files' block IDs at all -- even if the client is careful to
59         // save "aaa" before saving "zzz".
60         //
61         // Instead, we get pages in modified_at order. Collections
62         // that are modified during the run will be re-fetched in a
63         // subsequent page.
64
65         limit := pageSize
66         if limit <= 0 {
67                 // Use the maximum page size the server allows
68                 limit = 1<<31 - 1
69         }
70         params := arvados.ResourceListParams{
71                 Limit:              &limit,
72                 Order:              "modified_at, uuid",
73                 Count:              "none",
74                 Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
75                 IncludeTrash:       true,
76                 IncludeOldVersions: true,
77         }
78         var last arvados.Collection
79         var filterTime time.Time
80         callCount := 0
81         gettingExactTimestamp := false
82         for {
83                 progress(callCount, expectCount)
84                 var page arvados.CollectionList
85                 err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
86                 if err != nil {
87                         return err
88                 }
89                 for _, coll := range page.Items {
90                         if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
91                                 continue
92                         }
93                         callCount++
94                         err = f(coll)
95                         if err != nil {
96                                 return err
97                         }
98                         last = coll
99                 }
100                 if len(page.Items) == 0 && !gettingExactTimestamp {
101                         break
102                 } else if last.ModifiedAt.IsZero() {
103                         return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
104                 } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
105                         // If we requested time>=X and never got a
106                         // time>X then we might not have received all
107                         // items with time==X yet. Switch to
108                         // gettingExactTimestamp mode (if we're not
109                         // there already), advancing our UUID
110                         // threshold with each request, until we get
111                         // an empty page.
112                         gettingExactTimestamp = true
113                         params.Filters = []arvados.Filter{{
114                                 Attr:     "modified_at",
115                                 Operator: "=",
116                                 Operand:  filterTime,
117                         }, {
118                                 Attr:     "uuid",
119                                 Operator: ">",
120                                 Operand:  last.UUID,
121                         }}
122                 } else if gettingExactTimestamp {
123                         // This must be an empty page (in this mode,
124                         // an unequal timestamp is impossible) so we
125                         // can start getting pages of newer
126                         // collections.
127                         gettingExactTimestamp = false
128                         params.Filters = []arvados.Filter{{
129                                 Attr:     "modified_at",
130                                 Operator: ">",
131                                 Operand:  filterTime,
132                         }}
133                 } else {
134                         // In the normal case, we know we have seen
135                         // all collections with modtime<filterTime,
136                         // but we might not have seen all that have
137                         // modtime=filterTime. Hence we use >= instead
138                         // of > and skip the obvious overlapping item,
139                         // i.e., the last item on the previous
140                         // page. In some edge cases this can return
141                         // collections we have already seen, but
142                         // avoiding that would add overhead in the
143                         // overwhelmingly common cases, so we don't
144                         // bother.
145                         filterTime = last.ModifiedAt
146                         params.Filters = []arvados.Filter{{
147                                 Attr:     "modified_at",
148                                 Operator: ">=",
149                                 Operand:  filterTime,
150                         }, {
151                                 Attr:     "uuid",
152                                 Operator: "!=",
153                                 Operand:  last.UUID,
154                         }}
155                 }
156         }
157         progress(callCount, expectCount)
158
159         if checkCount, err := countCollections(c, arvados.ResourceListParams{
160                 Filters: []arvados.Filter{{
161                         Attr:     "modified_at",
162                         Operator: "<=",
163                         Operand:  filterTime}},
164                 IncludeTrash:       true,
165                 IncludeOldVersions: true,
166         }); err != nil {
167                 return err
168         } else if callCount < checkCount {
169                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
170         }
171
172         return nil
173 }
174
175 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
176         ctx, cancel := context.WithCancel(ctx)
177         defer cancel()
178
179         defer bal.time("update_collections", "wall clock time to update collections")()
180         threshold := time.Now()
181         thresholdStr := threshold.Format(time.RFC3339Nano)
182
183         var err error
184         collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
185         go func() {
186                 defer close(collQ)
187                 err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
188                         if coll.ModifiedAt.After(threshold) {
189                                 return io.EOF
190                         }
191                         if coll.IsTrashed {
192                                 return nil
193                         }
194                         collQ <- coll
195                         return nil
196                 }, func(done, total int) {
197                         bal.logf("update collections: %d/%d", done, total)
198                 })
199                 if err == io.EOF {
200                         err = nil
201                 } else if err != nil {
202                         bal.logf("error updating collections: %s", err)
203                 }
204         }()
205
206         db, err := bal.db(cluster)
207         if err != nil {
208                 return err
209         }
210
211         var updated int64
212         var wg sync.WaitGroup
213         for i := 0; i < runtime.NumCPU(); i++ {
214                 wg.Add(1)
215                 go func() {
216                         defer wg.Done()
217                         for coll := range collQ {
218                                 blkids, err := coll.SizedDigests()
219                                 if err != nil {
220                                         bal.logf("%s: %s", coll.UUID, err)
221                                         continue
222                                 }
223                                 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
224                                 tx, err := db.Beginx()
225                                 if err != nil {
226                                         bal.logf("error opening transaction: %s", coll.UUID, err)
227                                         cancel()
228                                         continue
229                                 }
230                                 classes, _ := json.Marshal(coll.StorageClassesDesired)
231                                 _, err = tx.ExecContext(ctx, `update collections set
232                                         replication_confirmed=$1,
233                                         replication_confirmed_at=$2,
234                                         storage_classes_confirmed=$3,
235                                         storage_classes_confirmed_at=$2
236                                         where uuid=$4`,
237                                         repl, thresholdStr, classes, coll.UUID)
238                                 if err != nil {
239                                         tx.Rollback()
240                                 } else {
241                                         err = tx.Commit()
242                                 }
243                                 if err != nil {
244                                         bal.logf("%s: update failed: %s", coll.UUID, err)
245                                         continue
246                                 }
247                                 atomic.AddInt64(&updated, 1)
248                         }
249                 }()
250         }
251         wg.Wait()
252         bal.logf("updated %d collections", updated)
253         return err
254 }
255
256 func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
257         db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
258         if err != nil {
259                 return nil, err
260         }
261         if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
262                 db.SetMaxOpenConns(p)
263         }
264         if err := db.Ping(); err != nil {
265                 return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
266         }
267         return db, nil
268 }