X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3a1face2e3bc02e1fb9c53a2268095811b2e069d..61c3f86eb779ab8e723e43354eddafe219bc27d9:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 9a7a838f1b..5519ad8670 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -1,4 +1,4 @@ -/* Deals with parsing Collection responses from API Server. */ +// Deals with parsing Collection responses from API Server. package collection @@ -13,13 +13,12 @@ import ( "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" - "runtime" "runtime/pprof" "time" ) var ( - heap_profile_filename string + heapProfileFilename string // globals for debugging totalManifestSize uint64 maxManifestSize uint64 @@ -34,9 +33,13 @@ type Collection struct { } type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection - OwnerToCollectionSize map[string]int + ReadAllCollections bool + UuidToCollection map[string]Collection + OwnerToCollectionSize map[string]int + BlockToDesiredReplication map[blockdigest.DigestWithSize]int + CollectionUuidToIndex map[string]int + CollectionIndexToUuid []string + BlockToCollectionIndices map[blockdigest.DigestWithSize][]int } type GetCollectionsParams struct { @@ -59,7 +62,7 @@ type SdkCollectionList struct { } func init() { - flag.StringVar(&heap_profile_filename, + flag.StringVar(&heapProfileFilename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") @@ -72,9 +75,9 @@ func init() { // Otherwise we would see cumulative numbers as explained here: // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ func WriteHeapProfile() { - if heap_profile_filename != "" { + if heapProfileFilename != "" { - heap_profile, err := os.Create(heap_profile_filename) + heap_profile, err := os.Create(heapProfileFilename) if err != nil { log.Fatal(err) } @@ -90,17 +93,7 @@ func WriteHeapProfile() { func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { results = GetCollections(params) - ComputeSizeOfOwnedCollections(&results) - - if params.Logger != nil { - params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := p["collection_info"].(map[string]interface{}) - // Since maps are shallow copied, we run a risk of concurrent - // updates here. By copying results.OwnerToCollectionSize into - // the log, we're assuming that it won't be updated. - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - }) - } + results.Summarize(params.Logger) log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) log.Printf("Read and processed %d collections", @@ -125,7 +118,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { fieldsWanted := []string{"manifest_text", "owner_uuid", "uuid", - // TODO(misha): Start using the redundancy field. "redundancy", "modified_at"} @@ -138,6 +130,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["limit"] = params.BatchSize } + var defaultReplicationLevel int + { + value, err := params.Client.Discovery("defaultCollectionReplication") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying default collection replication: %v", err)) + } + + defaultReplicationLevel = int(value.(float64)) + if defaultReplicationLevel <= 0 { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Default collection replication returned by arvados SDK "+ + "should be a positive integer but instead it was %d.", + defaultReplicationLevel)) + } + } + initialNumberOfCollectionsAvailable, err := util.NumberItemsAvailable(params.Client, "collections") if err != nil { @@ -152,10 +161,10 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := make(map[string]interface{}) + collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable collectionInfo["batch_size"] = params.BatchSize - p["collection_info"] = collectionInfo + collectionInfo["default_replication_level"] = defaultReplicationLevel }) } @@ -181,6 +190,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["filters"].([][]string)[0][2] = ProcessCollections(params.Logger, collections.Items, + defaultReplicationLevel, results.UuidToCollection).Format(time.RFC3339) // update counts @@ -197,7 +207,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["collections_read"] = totalCollections collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] collectionInfo["total_manifest_size"] = totalManifestSize @@ -206,9 +216,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { } } - // Just in case this lowers the numbers reported in the heap profile. - runtime.GC() - // Write the heap profile for examining memory usage WriteHeapProfile() @@ -224,6 +231,7 @@ func StrCopy(s string) string { func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, + defaultReplicationLevel int, uuidToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), @@ -235,15 +243,20 @@ func ProcessCollections(arvLogger *logger.Logger, loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf( "Arvados SDK collection returned with unexpected zero "+ - "modifcation date. This probably means that either we failed to "+ + "modification date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ - "it returns modification dates: %v", + "it returns modification dates: %+v", collection)) } if sdkCollection.ModifiedAt.After(latestModificationDate) { latestModificationDate = sdkCollection.ModifiedAt } + + if collection.ReplicationLevel == 0 { + collection.ReplicationLevel = defaultReplicationLevel + } + manifest := manifest.Manifest{sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) @@ -282,11 +295,47 @@ func ProcessCollections(arvLogger *logger.Logger, return } -func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { +func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) + readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int) + numCollections := len(readCollections.UuidToCollection) + readCollections.CollectionUuidToIndex = make(map[string]int, numCollections) + readCollections.CollectionIndexToUuid = make([]string, 0, numCollections) + readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int) + for _, coll := range readCollections.UuidToCollection { + collectionIndex := len(readCollections.CollectionIndexToUuid) + readCollections.CollectionIndexToUuid = + append(readCollections.CollectionIndexToUuid, coll.Uuid) + readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex + readCollections.OwnerToCollectionSize[coll.OwnerUuid] = readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + + for block, size := range coll.BlockDigestToSize { + locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)} + readCollections.BlockToCollectionIndices[locator] = + append(readCollections.BlockToCollectionIndices[locator], + collectionIndex) + storedReplication := readCollections.BlockToDesiredReplication[locator] + if coll.ReplicationLevel > storedReplication { + readCollections.BlockToDesiredReplication[locator] = + coll.ReplicationLevel + } + } + } + + if arvLogger != nil { + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := logger.GetOrCreateMap(p, "collection_info") + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = + readCollections.OwnerToCollectionSize + collectionInfo["distinct_blocks_named"] = + len(readCollections.BlockToDesiredReplication) + }) } return