X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a5a4f79e91aa8bba1794394646808f6d4c444661..5824ee2e5198dd46a7813fe2adbd380a114f9ac4:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index a3f1e311f2..e0929b7757 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -1,4 +1,4 @@ -/* Deals with parsing Collection responses from API Server. */ +// Deals with parsing Collection responses from API Server. package collection @@ -13,7 +13,6 @@ import ( "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" - "runtime" "runtime/pprof" "time" ) @@ -38,13 +37,13 @@ type Collection struct { } type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection - OwnerToCollectionSize map[string]int - BlockToReplication map[blockdigest.BlockDigest]int - CollectionUuidToIndex map[string]int - CollectionIndexToUuid []string - BlockToCollectionIndex map[blockdigest.BlockDigest]int + ReadAllCollections bool + UuidToCollection map[string]Collection + OwnerToCollectionSize map[string]int + BlockToReplication map[blockdigest.BlockDigest]int + CollectionUuidToIndex map[string]int + CollectionIndexToUuid []string + BlockToCollectionIndices map[blockdigest.BlockDigest][]int } type GetCollectionsParams struct { @@ -98,17 +97,7 @@ func WriteHeapProfile() { func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { results = GetCollections(params) - Summarize(&results) - - if params.Logger != nil { - params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := p["collection_info"].(map[string]interface{}) - // Since maps are shallow copied, we run a risk of concurrent - // updates here. By copying results.OwnerToCollectionSize into - // the log, we're assuming that it won't be updated. - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - }) - } + results.Summarize(params.Logger) log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) log.Printf("Read and processed %d collections", @@ -133,7 +122,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { fieldsWanted := []string{"manifest_text", "owner_uuid", "uuid", - // TODO(misha): Start using the redundancy field. "redundancy", "modified_at"} @@ -160,10 +148,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := make(map[string]interface{}) + collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable collectionInfo["batch_size"] = params.BatchSize - p["collection_info"] = collectionInfo }) } @@ -205,7 +192,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["collections_read"] = totalCollections collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] collectionInfo["total_manifest_size"] = totalManifestSize @@ -214,9 +201,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { } } - // Just in case this lowers the numbers reported in the heap profile. - runtime.GC() - // Write the heap profile for examining memory usage WriteHeapProfile() @@ -243,7 +227,7 @@ func ProcessCollections(arvLogger *logger.Logger, loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf( "Arvados SDK collection returned with unexpected zero "+ - "modifcation date. This probably means that either we failed to "+ + "modification date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ "it returns modification dates: %+v", collection)) @@ -295,13 +279,13 @@ func ProcessCollections(arvLogger *logger.Logger, return } -func Summarize(readCollections *ReadCollections) { +func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int) numCollections := len(readCollections.UuidToCollection) readCollections.CollectionUuidToIndex = make(map[string]int, numCollections) readCollections.CollectionIndexToUuid = make([]string, 0, numCollections) - readCollections.BlockToCollectionIndex = make(map[blockdigest.BlockDigest]int) + readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int) for _, coll := range readCollections.UuidToCollection { collectionIndex := len(readCollections.CollectionIndexToUuid) @@ -313,7 +297,8 @@ func Summarize(readCollections *ReadCollections) { readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize for block, _ := range coll.BlockDigestToSize { - readCollections.BlockToCollectionIndex[block] = collectionIndex + readCollections.BlockToCollectionIndices[block] = + append(readCollections.BlockToCollectionIndices[block], collectionIndex) storedReplication := readCollections.BlockToReplication[block] if coll.ReplicationLevel > storedReplication { readCollections.BlockToReplication[block] = coll.ReplicationLevel @@ -321,5 +306,18 @@ func Summarize(readCollections *ReadCollections) { } } + if arvLogger != nil { + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := logger.GetOrCreateMap(p, "collection_info") + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = + readCollections.OwnerToCollectionSize + collectionInfo["distinct_blocks_named"] = + len(readCollections.BlockToReplication) + }) + } + return }