1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "github.com/jmoiron/sqlx"
21 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
22 var page arvados.CollectionList
25 params.Count = "exact"
26 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
27 return page.ItemsAvailable, err
30 // EachCollection calls f once for every readable
31 // collection. EachCollection stops if it encounters an error, such as
32 // f returning a non-nil error.
34 // The progress function is called periodically with done (number of
35 // times f has been called) and total (number of times f is expected
37 func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
39 progress = func(_, _ int) {}
42 expectCount, err := countCollections(c, arvados.ResourceListParams{
44 IncludeOldVersions: true,
49 var newestModifiedAt time.Time
51 rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
57 var coll arvados.Collection
58 var classesDesired []byte
59 err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
64 err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
69 if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
70 newestModifiedAt = coll.ModifiedAt
77 progress(callCount, expectCount)
80 if err := rows.Err(); err != nil {
83 if checkCount, err := countCollections(c, arvados.ResourceListParams{
84 Filters: []arvados.Filter{{
87 Operand: newestModifiedAt}},
89 IncludeOldVersions: true,
92 } else if callCount < checkCount {
93 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
99 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
100 ctx, cancel := context.WithCancel(ctx)
103 defer bal.time("update_collections", "wall clock time to update collections")()
104 threshold := time.Now()
105 thresholdStr := threshold.Format(time.RFC3339Nano)
108 collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
111 err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
112 if coll.ModifiedAt.After(threshold) {
120 }, func(done, total int) {
121 bal.logf("update collections: %d/%d", done, total)
125 } else if err != nil {
126 bal.logf("error updating collections: %s", err)
131 var wg sync.WaitGroup
132 for i := 0; i < runtime.NumCPU(); i++ {
136 for coll := range collQ {
137 blkids, err := coll.SizedDigests()
139 bal.logf("%s: %s", coll.UUID, err)
142 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
143 classes, err := json.Marshal(coll.StorageClassesDesired)
145 bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
148 _, err = bal.DB.ExecContext(ctx, `update collections set
149 replication_confirmed=$1,
150 replication_confirmed_at=$2,
151 storage_classes_confirmed=$3,
152 storage_classes_confirmed_at=$2
154 repl, thresholdStr, classes, coll.UUID)
156 bal.logf("%s: update failed: %s", coll.UUID, err)
159 atomic.AddInt64(&updated, 1)
164 bal.logf("updated %d collections", updated)