X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/53f785c298338645b6880f22f26b0c36a7cfab4d..61c3f86eb779ab8e723e43354eddafe219bc27d9:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index f63b95f0f5..5519ad8670 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -13,7 +13,6 @@ import ( "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" - "runtime" "runtime/pprof" "time" ) @@ -25,10 +24,6 @@ var ( maxManifestSize uint64 ) -const ( - DefaultReplicationLevel = 2 -) - type Collection struct { Uuid string OwnerUuid string @@ -38,13 +33,13 @@ type Collection struct { } type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection - OwnerToCollectionSize map[string]int - BlockToReplication map[blockdigest.BlockDigest]int - CollectionUuidToIndex map[string]int - CollectionIndexToUuid []string - BlockToCollectionIndices map[blockdigest.BlockDigest][]int + ReadAllCollections bool + UuidToCollection map[string]Collection + OwnerToCollectionSize map[string]int + BlockToDesiredReplication map[blockdigest.DigestWithSize]int + CollectionUuidToIndex map[string]int + CollectionIndexToUuid []string + BlockToCollectionIndices map[blockdigest.DigestWithSize][]int } type GetCollectionsParams struct { @@ -123,7 +118,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { fieldsWanted := []string{"manifest_text", "owner_uuid", "uuid", - // TODO(misha): Start using the redundancy field. "redundancy", "modified_at"} @@ -136,6 +130,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["limit"] = params.BatchSize } + var defaultReplicationLevel int + { + value, err := params.Client.Discovery("defaultCollectionReplication") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying default collection replication: %v", err)) + } + + defaultReplicationLevel = int(value.(float64)) + if defaultReplicationLevel <= 0 { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Default collection replication returned by arvados SDK "+ + "should be a positive integer but instead it was %d.", + defaultReplicationLevel)) + } + } + initialNumberOfCollectionsAvailable, err := util.NumberItemsAvailable(params.Client, "collections") if err != nil { @@ -153,6 +164,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable collectionInfo["batch_size"] = params.BatchSize + collectionInfo["default_replication_level"] = defaultReplicationLevel }) } @@ -178,6 +190,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["filters"].([][]string)[0][2] = ProcessCollections(params.Logger, collections.Items, + defaultReplicationLevel, results.UuidToCollection).Format(time.RFC3339) // update counts @@ -203,9 +216,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { } } - // Just in case this lowers the numbers reported in the heap profile. - runtime.GC() - // Write the heap profile for examining memory usage WriteHeapProfile() @@ -221,6 +231,7 @@ func StrCopy(s string) string { func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, + defaultReplicationLevel int, uuidToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), @@ -232,7 +243,7 @@ func ProcessCollections(arvLogger *logger.Logger, loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf( "Arvados SDK collection returned with unexpected zero "+ - "modifcation date. This probably means that either we failed to "+ + "modification date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ "it returns modification dates: %+v", collection)) @@ -243,7 +254,7 @@ func ProcessCollections(arvLogger *logger.Logger, } if collection.ReplicationLevel == 0 { - collection.ReplicationLevel = DefaultReplicationLevel + collection.ReplicationLevel = defaultReplicationLevel } manifest := manifest.Manifest{sdkCollection.ManifestText} @@ -286,11 +297,11 @@ func ProcessCollections(arvLogger *logger.Logger, func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) - readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int) + readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int) numCollections := len(readCollections.UuidToCollection) readCollections.CollectionUuidToIndex = make(map[string]int, numCollections) readCollections.CollectionIndexToUuid = make([]string, 0, numCollections) - readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int) + readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int) for _, coll := range readCollections.UuidToCollection { collectionIndex := len(readCollections.CollectionIndexToUuid) @@ -301,12 +312,15 @@ func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize[coll.OwnerUuid] = readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize - for block, _ := range coll.BlockDigestToSize { - readCollections.BlockToCollectionIndices[block] = - append(readCollections.BlockToCollectionIndices[block], collectionIndex) - storedReplication := readCollections.BlockToReplication[block] + for block, size := range coll.BlockDigestToSize { + locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)} + readCollections.BlockToCollectionIndices[locator] = + append(readCollections.BlockToCollectionIndices[locator], + collectionIndex) + storedReplication := readCollections.BlockToDesiredReplication[locator] if coll.ReplicationLevel > storedReplication { - readCollections.BlockToReplication[block] = coll.ReplicationLevel + readCollections.BlockToDesiredReplication[locator] = + coll.ReplicationLevel } } } @@ -320,7 +334,7 @@ func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { collectionInfo["owner_to_collection_size"] = readCollections.OwnerToCollectionSize collectionInfo["distinct_blocks_named"] = - len(readCollections.BlockToReplication) + len(readCollections.BlockToDesiredReplication) }) }