X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a1adf1ed6f93ce0769f307a86b6389e9e8e630a9..3091c9c336ddca217b14745142f9473a489f42de:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 1229f2917e..1d41a7087a 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -131,7 +131,8 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e sdkParams := arvadosclient.Dict{ "select": fieldsWanted, "order": []string{"modified_at ASC"}, - "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} + "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}, + "offset": 0} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize @@ -176,9 +177,10 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e // These values are just for getting the loop to run the first time, // afterwards they'll be set to real values. - previousTotalCollections := -1 - totalCollections := 0 - for totalCollections > previousTotalCollections { + remainingCollections := 1 + var totalCollections int + var previousTotalCollections int + for remainingCollections > 0 { // We're still finding new collections // Write the heap profile for examining memory usage @@ -194,6 +196,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e return } + // Update count of remaining collections + remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int) + if remainingCollections < 0 { + remainingCollections = 0 + } + // Process collection and update our date filter. latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger, collections.Items, @@ -202,16 +210,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e if err != nil { return results, err } - sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) + if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) { + sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) + sdkParams["offset"] = 0 + } else { + sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize + } // update counts previousTotalCollections = totalCollections totalCollections = len(results.UUIDToCollection) log.Printf("%d collections read, %d new in last batch, "+ + "%d remaining, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", totalCollections, totalCollections-previousTotalCollections, + remainingCollections, sdkParams["filters"].([][]string)[0][2], float32(totalManifestSize)/float32(totalCollections), maxManifestSize, totalManifestSize)