X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d005c38bea17107e7be3c5338ad31ba54085df61..bbd85b230ccd208ef942792109fed1c9161f9429:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 33970d8bc4..f0a0f3f1aa 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -18,9 +18,6 @@ import ( var ( HeapProfileFilename string - // globals for debugging - totalManifestSize uint64 - maxManifestSize uint64 ) // Collection representation @@ -41,7 +38,6 @@ type ReadCollections struct { CollectionUUIDToIndex map[string]int CollectionIndexToUUID []string BlockToCollectionIndices map[blockdigest.DigestWithSize][]int - Err error } // GetCollectionsParams params @@ -96,10 +92,9 @@ func WriteHeapProfile() error { } // GetCollectionsAndSummarize gets collections from api and summarizes -func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { - results, err := GetCollections(params) +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) { + results, err = GetCollections(params) if err != nil { - results.Err = err return } @@ -135,8 +130,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e sdkParams := arvadosclient.Dict{ "select": fieldsWanted, - "order": []string{"modified_at ASC"}, - "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} + "order": []string{"modified_at ASC", "uuid ASC"}, + "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}, + "offset": 0} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize @@ -181,9 +177,10 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e // These values are just for getting the loop to run the first time, // afterwards they'll be set to real values. - previousTotalCollections := -1 - totalCollections := 0 - for totalCollections > previousTotalCollections { + remainingCollections := 1 + var totalCollections int + var previousTotalCollections int + for remainingCollections > 0 { // We're still finding new collections // Write the heap profile for examining memory usage @@ -198,26 +195,43 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e if err != nil { return } + batchCollections := len(collections.Items) + + // We must always have at least one collection in the batch + if batchCollections < 1 { + err = fmt.Errorf("API query returned no collections for %+v", sdkParams) + return + } + + // Update count of remaining collections + remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections // Process collection and update our date filter. - var latestModificationDate time.Time - latestModificationDate, err = ProcessCollections(params.Logger, + latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger, collections.Items, defaultReplicationLevel, results.UUIDToCollection) if err != nil { - return + return results, err + } + if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) { + sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) + sdkParams["offset"] = 0 + } else { + sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections } - sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) // update counts previousTotalCollections = totalCollections totalCollections = len(results.UUIDToCollection) - log.Printf("%d collections read, %d new in last batch, "+ + log.Printf("%d collections read, %d (%d new) in last batch, "+ + "%d remaining, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", totalCollections, + batchCollections, totalCollections-previousTotalCollections, + remainingCollections, sdkParams["filters"].([][]string)[0][2], float32(totalManifestSize)/float32(totalCollections), maxManifestSize, totalManifestSize) @@ -250,7 +264,12 @@ func StrCopy(s string) string { func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, defaultReplicationLevel int, - UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) { + UUIDToCollection map[string]Collection, +) ( + latestModificationDate time.Time, + maxManifestSize, totalManifestSize uint64, + err error, +) { for _, sdkCollection := range receivedCollections { collection := Collection{UUID: StrCopy(sdkCollection.UUID), OwnerUUID: StrCopy(sdkCollection.OwnerUUID), @@ -258,12 +277,13 @@ func ProcessCollections(arvLogger *logger.Logger, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - return latestModificationDate, fmt.Errorf( + err = fmt.Errorf( "Arvados SDK collection returned with unexpected zero "+ "modification date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ "it returns modification dates: %+v", collection) + return } if sdkCollection.ModifiedAt.After(latestModificationDate) {