1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "github.com/jmoiron/sqlx"
20 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
21 var page arvados.CollectionList
24 params.Count = "exact"
25 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
26 return page.ItemsAvailable, err
29 // EachCollection calls f once for every readable
30 // collection. EachCollection stops if it encounters an error, such as
31 // f returning a non-nil error.
33 // The progress function is called periodically with done (number of
34 // times f has been called) and total (number of times f is expected
36 func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
38 progress = func(_, _ int) {}
41 expectCount, err := countCollections(c, arvados.ResourceListParams{
43 IncludeOldVersions: true,
48 var newestModifiedAt time.Time
50 rows, err := db.QueryxContext(ctx, `SELECT
51 uuid, manifest_text, modified_at, portable_data_hash,
52 replication_desired, replication_confirmed, replication_confirmed_at,
53 storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
60 progressTicker := time.NewTicker(10 * time.Second)
61 defer progressTicker.Stop()
64 var coll arvados.Collection
65 var classesDesired, classesConfirmed []byte
66 err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
67 &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
68 &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
74 err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
75 if err != nil && len(classesDesired) > 0 {
78 err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
79 if err != nil && len(classesConfirmed) > 0 {
82 if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
83 newestModifiedAt = coll.ModifiedAt
91 case <-progressTicker.C:
92 progress(callCount, expectCount)
96 progress(callCount, expectCount)
101 if checkCount, err := countCollections(c, arvados.ResourceListParams{
102 Filters: []arvados.Filter{{
105 Operand: newestModifiedAt}},
107 IncludeOldVersions: true,
110 } else if callCount < checkCount {
111 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
117 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
118 ctx, cancel := context.WithCancel(ctx)
121 defer bal.time("update_collections", "wall clock time to update collections")()
122 threshold := time.Now()
123 thresholdStr := threshold.Format(time.RFC3339Nano)
127 errs := make(chan error, 1)
128 collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
131 err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
132 if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
133 bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
135 return context.Canceled
139 }, func(done, total int) {
140 bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
142 if err != nil && err != context.Canceled {
150 var wg sync.WaitGroup
152 // Use about 1 goroutine per 2 CPUs. Based on experiments with
153 // a 2-core host, using more concurrent database
154 // calls/transactions makes this process slower, not faster.
155 for i := 0; i < runtime.NumCPU()+1/2; i++ {
157 goSendErr(errs, func() error {
159 tx, err := bal.DB.Beginx()
164 flush := func(final bool) error {
166 if err != nil && ctx.Err() == nil {
174 tx, err = bal.DB.Beginx()
178 for coll := range collQ {
179 if ctx.Err() != nil || len(errs) > 0 {
182 blkids, err := coll.SizedDigests()
184 bal.logf("%s: %s", coll.UUID, err)
187 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
189 desired := bal.DefaultReplication
190 if coll.ReplicationDesired != nil {
191 desired = *coll.ReplicationDesired
194 // If actual>desired, confirm
195 // the desired number rather
196 // than actual to avoid
197 // flapping updates when
198 // replication increases
202 classes := emptyJSONArray
204 classes, err = json.Marshal(coll.StorageClassesDesired)
206 bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
210 needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
211 for i := range coll.StorageClassesDesired {
212 if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
219 _, err = tx.ExecContext(ctx, `update collections set
220 replication_confirmed=$1,
221 replication_confirmed_at=$2,
222 storage_classes_confirmed=$3,
223 storage_classes_confirmed_at=$2
225 repl, thresholdStr, classes, coll.UUID)
227 if ctx.Err() == nil {
228 bal.logf("%s: update failed: %s", coll.UUID, err)
232 atomic.AddInt64(&updated, 1)
233 if txPending++; txPending >= txBatch {
244 bal.logf("updated %d collections", updated)
246 return fmt.Errorf("error updating collections: %s", <-errs)
251 // Call f in a new goroutine. If it returns a non-nil error, send the
252 // error to the errs channel (unless the channel is already full with
254 func goSendErr(errs chan<- error, f func() error) {
266 var emptyJSONArray = []byte("[]")