1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "github.com/jmoiron/sqlx"
20 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
21 var page arvados.CollectionList
24 params.Count = "exact"
25 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
26 return page.ItemsAvailable, err
29 // EachCollection calls f once for every readable
30 // collection. EachCollection stops if it encounters an error, such as
31 // f returning a non-nil error.
33 // The progress function is called periodically with done (number of
34 // times f has been called) and total (number of times f is expected
36 func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
38 progress = func(_, _ int) {}
41 expectCount, err := countCollections(c, arvados.ResourceListParams{
43 IncludeOldVersions: true,
48 var newestModifiedAt time.Time
50 rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
54 progressTicker := time.NewTicker(10 * time.Second)
55 defer progressTicker.Stop()
58 var coll arvados.Collection
59 var classesDesired []byte
60 err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
65 err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
70 if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
71 newestModifiedAt = coll.ModifiedAt
79 case <-progressTicker.C:
80 progress(callCount, expectCount)
84 progress(callCount, expectCount)
86 if err := rows.Err(); err != nil {
89 if checkCount, err := countCollections(c, arvados.ResourceListParams{
90 Filters: []arvados.Filter{{
93 Operand: newestModifiedAt}},
95 IncludeOldVersions: true,
98 } else if callCount < checkCount {
99 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
105 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
106 ctx, cancel := context.WithCancel(ctx)
109 defer bal.time("update_collections", "wall clock time to update collections")()
110 threshold := time.Now()
111 thresholdStr := threshold.Format(time.RFC3339Nano)
116 collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
119 err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
120 if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
121 bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
123 return context.Canceled
127 }, func(done, total int) {
128 bal.logf("update collections: %d/%d", done, total)
130 if err != nil && err != context.Canceled {
131 bal.logf("error updating collections: %s", err)
135 var wg sync.WaitGroup
136 for i := 0; i < runtime.NumCPU(); i++ {
140 for coll := range collQ {
141 if ctx.Err() != nil {
144 blkids, err := coll.SizedDigests()
146 bal.logf("%s: %s", coll.UUID, err)
149 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
150 classes, err := json.Marshal(coll.StorageClassesDesired)
152 bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
155 _, err = bal.DB.ExecContext(ctx, `update collections set
156 replication_confirmed=$1,
157 replication_confirmed_at=$2,
158 storage_classes_confirmed=$3,
159 storage_classes_confirmed_at=$2
161 repl, thresholdStr, classes, coll.UUID)
163 if err != context.Canceled {
164 bal.logf("%s: update failed: %s", coll.UUID, err)
168 atomic.AddInt64(&updated, 1)
173 bal.logf("updated %d collections", updated)