1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "github.com/jmoiron/sqlx"
21 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
22 var page arvados.CollectionList
25 params.Count = "exact"
26 err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
27 return page.ItemsAvailable, err
30 // EachCollection calls f once for every readable
31 // collection. EachCollection stops if it encounters an error, such as
32 // f returning a non-nil error.
34 // The progress function is called periodically with done (number of
35 // times f has been called) and total (number of times f is expected
38 // If pageSize > 0 it is used as the maximum page size in each API
39 // call; otherwise the maximum allowed page size is requested.
40 func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
42 progress = func(_, _ int) {}
45 expectCount, err := countCollections(c, arvados.ResourceListParams{
47 IncludeOldVersions: true,
53 // Note the obvious way to get all collections (sorting by
54 // UUID) would be much easier, but would lose data: If a
55 // client were to move files from collection with uuid="zzz"
56 // to a collection with uuid="aaa" around the time when we
57 // were fetching the "mmm" page, we would never see those
58 // files' block IDs at all -- even if the client is careful to
59 // save "aaa" before saving "zzz".
61 // Instead, we get pages in modified_at order. Collections
62 // that are modified during the run will be re-fetched in a
67 // Use the maximum page size the server allows
70 params := arvados.ResourceListParams{
72 Order: "modified_at, uuid",
74 Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
76 IncludeOldVersions: true,
78 var last arvados.Collection
79 var filterTime time.Time
81 gettingExactTimestamp := false
83 progress(callCount, expectCount)
84 var page arvados.CollectionList
85 err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
89 for _, coll := range page.Items {
90 if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
100 if len(page.Items) == 0 && !gettingExactTimestamp {
102 } else if last.ModifiedAt.IsZero() {
103 return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
104 } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
105 // If we requested time>=X and never got a
106 // time>X then we might not have received all
107 // items with time==X yet. Switch to
108 // gettingExactTimestamp mode (if we're not
109 // there already), advancing our UUID
110 // threshold with each request, until we get
112 gettingExactTimestamp = true
113 params.Filters = []arvados.Filter{{
122 } else if gettingExactTimestamp {
123 // This must be an empty page (in this mode,
124 // an unequal timestamp is impossible) so we
125 // can start getting pages of newer
127 gettingExactTimestamp = false
128 params.Filters = []arvados.Filter{{
134 // In the normal case, we know we have seen
135 // all collections with modtime<filterTime,
136 // but we might not have seen all that have
137 // modtime=filterTime. Hence we use >= instead
138 // of > and skip the obvious overlapping item,
139 // i.e., the last item on the previous
140 // page. In some edge cases this can return
141 // collections we have already seen, but
142 // avoiding that would add overhead in the
143 // overwhelmingly common cases, so we don't
145 filterTime = last.ModifiedAt
146 params.Filters = []arvados.Filter{{
157 progress(callCount, expectCount)
159 if checkCount, err := countCollections(c, arvados.ResourceListParams{
160 Filters: []arvados.Filter{{
163 Operand: filterTime}},
165 IncludeOldVersions: true,
168 } else if callCount < checkCount {
169 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
175 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
176 ctx, cancel := context.WithCancel(ctx)
179 defer bal.time("update_collections", "wall clock time to update collections")()
180 threshold := time.Now()
181 thresholdStr := threshold.Format(time.RFC3339Nano)
184 collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
187 err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
188 if coll.ModifiedAt.After(threshold) {
196 }, func(done, total int) {
197 bal.logf("update collections: %d/%d", done, total)
201 } else if err != nil {
202 bal.logf("error updating collections: %s", err)
206 db, err := bal.db(cluster)
212 var wg sync.WaitGroup
213 for i := 0; i < runtime.NumCPU(); i++ {
217 for coll := range collQ {
218 blkids, err := coll.SizedDigests()
220 bal.logf("%s: %s", coll.UUID, err)
223 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
224 tx, err := db.Beginx()
226 bal.logf("error opening transaction: %s", coll.UUID, err)
230 classes, _ := json.Marshal(coll.StorageClassesDesired)
231 _, err = tx.ExecContext(ctx, `update collections set
232 replication_confirmed=$1,
233 replication_confirmed_at=$2,
234 storage_classes_confirmed=$3,
235 storage_classes_confirmed_at=$2
237 repl, thresholdStr, classes, coll.UUID)
244 bal.logf("%s: update failed: %s", coll.UUID, err)
247 atomic.AddInt64(&updated, 1)
252 bal.logf("updated %d collections", updated)
256 func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
257 db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
261 if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
262 db.SetMaxOpenConns(p)
264 if err := db.Ping(); err != nil {
265 return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)