Merge branch 'main' from arvados-workbench2.git
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "runtime"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/jmoiron/sqlx"
18 )
19
20 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
21         var page arvados.CollectionList
22         var zero int
23         params.Limit = &zero
24         params.Count = "exact"
25         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
26         return page.ItemsAvailable, err
27 }
28
29 // EachCollection calls f once for every readable
30 // collection. EachCollection stops if it encounters an error, such as
31 // f returning a non-nil error.
32 //
33 // The progress function is called periodically with done (number of
34 // times f has been called) and total (number of times f is expected
35 // to be called).
36 func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
37         if progress == nil {
38                 progress = func(_, _ int) {}
39         }
40
41         expectCount, err := countCollections(c, arvados.ResourceListParams{
42                 IncludeTrash:       true,
43                 IncludeOldVersions: true,
44         })
45         if err != nil {
46                 return err
47         }
48         var newestModifiedAt time.Time
49
50         rows, err := db.QueryxContext(ctx, `SELECT
51                 uuid, manifest_text, modified_at, portable_data_hash,
52                 replication_desired, replication_confirmed, replication_confirmed_at,
53                 storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
54                 is_trashed
55                 FROM collections`)
56         if err != nil {
57                 return err
58         }
59         defer rows.Close()
60         progressTicker := time.NewTicker(10 * time.Second)
61         defer progressTicker.Stop()
62         callCount := 0
63         for rows.Next() {
64                 var coll arvados.Collection
65                 var classesDesired, classesConfirmed []byte
66                 err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
67                         &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
68                         &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
69                         &coll.IsTrashed)
70                 if err != nil {
71                         return err
72                 }
73
74                 err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
75                 if err != nil && len(classesDesired) > 0 {
76                         return err
77                 }
78                 err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
79                 if err != nil && len(classesConfirmed) > 0 {
80                         return err
81                 }
82                 if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
83                         newestModifiedAt = coll.ModifiedAt
84                 }
85                 callCount++
86                 err = f(coll)
87                 if err != nil {
88                         return err
89                 }
90                 select {
91                 case <-progressTicker.C:
92                         progress(callCount, expectCount)
93                 default:
94                 }
95         }
96         progress(callCount, expectCount)
97         err = rows.Close()
98         if err != nil {
99                 return err
100         }
101         if checkCount, err := countCollections(c, arvados.ResourceListParams{
102                 Filters: []arvados.Filter{{
103                         Attr:     "modified_at",
104                         Operator: "<=",
105                         Operand:  newestModifiedAt}},
106                 IncludeTrash:       true,
107                 IncludeOldVersions: true,
108         }); err != nil {
109                 return err
110         } else if callCount < checkCount {
111                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
112         }
113
114         return nil
115 }
116
117 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
118         ctx, cancel := context.WithCancel(ctx)
119         defer cancel()
120
121         defer bal.time("update_collections", "wall clock time to update collections")()
122         threshold := time.Now()
123         thresholdStr := threshold.Format(time.RFC3339Nano)
124
125         updated := int64(0)
126
127         errs := make(chan error, 1)
128         collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
129         go func() {
130                 defer close(collQ)
131                 err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
132                         if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
133                                 bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
134                                 cancel()
135                                 return context.Canceled
136                         }
137                         collQ <- coll
138                         return nil
139                 }, func(done, total int) {
140                         bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
141                 })
142                 if err != nil && err != context.Canceled {
143                         select {
144                         case errs <- err:
145                         default:
146                         }
147                 }
148         }()
149
150         var wg sync.WaitGroup
151
152         // Use about 1 goroutine per 2 CPUs. Based on experiments with
153         // a 2-core host, using more concurrent database
154         // calls/transactions makes this process slower, not faster.
155         for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
156                 wg.Add(1)
157                 goSendErr(errs, func() error {
158                         defer wg.Done()
159                         tx, err := bal.DB.Beginx()
160                         if err != nil {
161                                 return err
162                         }
163                         txPending := 0
164                         flush := func(final bool) error {
165                                 err := tx.Commit()
166                                 if err != nil && ctx.Err() == nil {
167                                         tx.Rollback()
168                                         return err
169                                 }
170                                 txPending = 0
171                                 if final {
172                                         return nil
173                                 }
174                                 tx, err = bal.DB.Beginx()
175                                 return err
176                         }
177                         txBatch := 100
178                         for coll := range collQ {
179                                 if ctx.Err() != nil || len(errs) > 0 {
180                                         continue
181                                 }
182                                 blkids, err := coll.SizedDigests()
183                                 if err != nil {
184                                         bal.logf("%s: %s", coll.UUID, err)
185                                         continue
186                                 }
187                                 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
188
189                                 desired := bal.DefaultReplication
190                                 if coll.ReplicationDesired != nil {
191                                         desired = *coll.ReplicationDesired
192                                 }
193                                 if repl > desired {
194                                         // If actual>desired, confirm
195                                         // the desired number rather
196                                         // than actual to avoid
197                                         // flapping updates when
198                                         // replication increases
199                                         // temporarily.
200                                         repl = desired
201                                 }
202                                 classes := emptyJSONArray
203                                 if repl > 0 {
204                                         classes, err = json.Marshal(coll.StorageClassesDesired)
205                                         if err != nil {
206                                                 bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
207                                                 continue
208                                         }
209                                 }
210                                 needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
211                                 for i := range coll.StorageClassesDesired {
212                                         if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
213                                                 needUpdate = true
214                                         }
215                                 }
216                                 if !needUpdate {
217                                         continue
218                                 }
219                                 _, err = tx.ExecContext(ctx, `update collections set
220                                         replication_confirmed=$1,
221                                         replication_confirmed_at=$2,
222                                         storage_classes_confirmed=$3,
223                                         storage_classes_confirmed_at=$2
224                                         where uuid=$4`,
225                                         repl, thresholdStr, classes, coll.UUID)
226                                 if err != nil {
227                                         if ctx.Err() == nil {
228                                                 bal.logf("%s: update failed: %s", coll.UUID, err)
229                                         }
230                                         continue
231                                 }
232                                 atomic.AddInt64(&updated, 1)
233                                 if txPending++; txPending >= txBatch {
234                                         err = flush(false)
235                                         if err != nil {
236                                                 return err
237                                         }
238                                 }
239                         }
240                         return flush(true)
241                 })
242         }
243         wg.Wait()
244         bal.logf("updated %d collections", updated)
245         if len(errs) > 0 {
246                 return fmt.Errorf("error updating collections: %s", <-errs)
247         }
248         return nil
249 }
250
251 // Call f in a new goroutine. If it returns a non-nil error, send the
252 // error to the errs channel (unless the channel is already full with
253 // another error).
254 func goSendErr(errs chan<- error, f func() error) {
255         go func() {
256                 err := f()
257                 if err != nil {
258                         select {
259                         case errs <- err:
260                         default:
261                         }
262                 }
263         }()
264 }
265
266 var emptyJSONArray = []byte("[]")