+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
+ "context"
"fmt"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
)
func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
//
// If pageSize > 0 it is used as the maximum page size in each API
// call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
expectCount, err := countCollections(c, arvados.ResourceListParams{
- IncludeTrash: true,
+ IncludeTrash: true,
+ IncludeOldVersions: true,
})
if err != nil {
return err
}
+ // Note the obvious way to get all collections (sorting by
+ // UUID) would be much easier, but would lose data: If a
+ // client were to move files from collection with uuid="zzz"
+ // to a collection with uuid="aaa" around the time when we
+ // were fetching the "mmm" page, we would never see those
+ // files' block IDs at all -- even if the client is careful to
+ // save "aaa" before saving "zzz".
+ //
+ // Instead, we get pages in modified_at order. Collections
+ // that are modified during the run will be re-fetched in a
+ // subsequent page.
+
limit := pageSize
if limit <= 0 {
// Use the maximum page size the server allows
limit = 1<<31 - 1
}
params := arvados.ResourceListParams{
- Limit: &limit,
- Order: "modified_at, uuid",
- Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
- IncludeTrash: true,
+ Limit: &limit,
+ Order: "modified_at, uuid",
+ Count: "none",
+ Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+ IncludeTrash: true,
+ IncludeOldVersions: true,
}
var last arvados.Collection
var filterTime time.Time
for {
progress(callCount, expectCount)
var page arvados.CollectionList
- err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
if err != nil {
return err
}
for _, coll := range page.Items {
- if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+ if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
continue
}
callCount++
}
if len(page.Items) == 0 && !gettingExactTimestamp {
break
- } else if last.ModifiedAt == nil {
+ } else if last.ModifiedAt.IsZero() {
return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
- } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
+ } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
// If we requested time>=X and never got a
// time>X then we might not have received all
// items with time==X yet. Switch to
// avoiding that would add overhead in the
// overwhelmingly common cases, so we don't
// bother.
- filterTime = *last.ModifiedAt
+ filterTime = last.ModifiedAt
params.Filters = []arvados.Filter{{
Attr: "modified_at",
Operator: ">=",
Attr: "modified_at",
Operator: "<=",
Operand: filterTime}},
- IncludeTrash: true,
+ IncludeTrash: true,
+ IncludeOldVersions: true,
}); err != nil {
return err
} else if callCount < checkCount {