X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3091c9c336ddca217b14745142f9473a489f42de..2cf5dde4a6d56d0fb87a1dc79743753632d640a0:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 1d41a7087a..c392ded35d 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -130,9 +130,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e sdkParams := arvadosclient.Dict{ "select": fieldsWanted, - "order": []string{"modified_at ASC"}, + "order": []string{"modified_at ASC", "uuid ASC"}, "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}, - "offset": 0} + "offset": 0} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize @@ -195,13 +195,17 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e if err != nil { return } + batchCollections := len(collections.Items) - // Update count of remaining collections - remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int) - if remainingCollections < 0 { - remainingCollections = 0 + // We must always have at least one collection in the batch + if batchCollections < 1 { + err = fmt.Errorf("API query returned no collections for %+v", sdkParams) + return } + // Update count of remaining collections + remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections + // Process collection and update our date filter. latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger, collections.Items, @@ -214,17 +218,18 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) sdkParams["offset"] = 0 } else { - sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize + sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections } // update counts previousTotalCollections = totalCollections totalCollections = len(results.UUIDToCollection) - log.Printf("%d collections read, %d new in last batch, "+ + log.Printf("%d collections read, %d (%d new) in last batch, "+ "%d remaining, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", totalCollections, + batchCollections, totalCollections-previousTotalCollections, remainingCollections, sdkParams["filters"].([][]string)[0][2], @@ -242,6 +247,30 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e } } + // Make one final API request to verify that we have processed all collections available up to the latest modification date + var collections SdkCollectionList + sdkParams["filters"].([][]string)[0][1] = "<=" + sdkParams["limit"] = 0 + err = params.Client.List("collections", sdkParams, &collections) + if err != nil { + return + } + finalNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + return + } + if totalCollections < finalNumberOfCollectionsAvailable { + err = fmt.Errorf("API server indicates a total of %d collections "+ + "available up to %v, but we only retrieved %d. "+ + "Refusing to continue as this could indicate an "+ + "otherwise undetected failure.", + finalNumberOfCollectionsAvailable, + sdkParams["filters"].([][]string)[0][2], + totalCollections) + return + } + // Write the heap profile for examining memory usage err = WriteHeapProfile()