X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2d2f3bed79f9504d15503277056feb394c12dd7c..248d2ebb4ab7ea3d9060838bcfbfe3b9330da5ee:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 73115f5fc9..0eca61cfe2 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -9,6 +9,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" + "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" @@ -18,12 +19,16 @@ import ( ) var ( - heap_profile_filename string + heapProfileFilename string // globals for debugging totalManifestSize uint64 maxManifestSize uint64 ) +const ( + DefaultReplicationLevel = 2 +) + type Collection struct { Uuid string OwnerUuid string @@ -36,6 +41,7 @@ type ReadCollections struct { ReadAllCollections bool UuidToCollection map[string]Collection OwnerToCollectionSize map[string]int + BlockToReplication map[blockdigest.BlockDigest]int } type GetCollectionsParams struct { @@ -58,21 +64,12 @@ type SdkCollectionList struct { } func init() { - flag.StringVar(&heap_profile_filename, + flag.StringVar(&heapProfileFilename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") } -// // Methods to implement util.SdkListResponse Interface -// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { -// return s.ItemsAvailable, nil -// } - -// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { -// return len(s.Items), nil -// } - // Write the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe @@ -80,9 +77,9 @@ func init() { // Otherwise we would see cumulative numbers as explained here: // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ func WriteHeapProfile() { - if heap_profile_filename != "" { + if heapProfileFilename != "" { - heap_profile, err := os.Create(heap_profile_filename) + heap_profile, err := os.Create(heapProfileFilename) if err != nil { log.Fatal(err) } @@ -98,13 +95,16 @@ func WriteHeapProfile() { func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { results = GetCollections(params) - ComputeSizeOfOwnedCollections(&results) + Summarize(&results) if params.Logger != nil { - properties, _ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize + }) } log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) @@ -143,7 +143,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["limit"] = params.BatchSize } - initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client) + initialNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying collection count: %v", err)) + } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. maxExpectedCollections := int( @@ -151,12 +156,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { - properties, _ := params.Logger.Edit() - collectionInfo := make(map[string]interface{}) - collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable - collectionInfo["batch_size"] = params.BatchSize - properties["collection_info"] = collectionInfo - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := make(map[string]interface{}) + collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable + collectionInfo["batch_size"] = params.BatchSize + p["collection_info"] = collectionInfo + }) } // These values are just for getting the loop to run the first time, @@ -196,13 +201,13 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { maxManifestSize, totalManifestSize) if params.Logger != nil { - properties, _ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["collections_read"] = totalCollections - collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] - collectionInfo["total_manifest_size"] = totalManifestSize - collectionInfo["max_manifest_size"] = maxManifestSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo["collections_read"] = totalCollections + collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] + collectionInfo["total_manifest_size"] = totalManifestSize + collectionInfo["max_manifest_size"] = maxManifestSize + }) } } @@ -237,13 +242,18 @@ func ProcessCollections(arvLogger *logger.Logger, "Arvados SDK collection returned with unexpected zero "+ "modifcation date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ - "it returns modification dates: %v", + "it returns modification dates: %+v", collection)) } if sdkCollection.ModifiedAt.After(latestModificationDate) { latestModificationDate = sdkCollection.ModifiedAt } + + if collection.ReplicationLevel == 0 { + collection.ReplicationLevel = DefaultReplicationLevel + } + manifest := manifest.Manifest{sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) @@ -282,22 +292,20 @@ func ProcessCollections(arvLogger *logger.Logger, return } -func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) int { - var collections SdkCollectionList - sdkParams := arvadosclient.Dict{"limit": 0} - err := client.List("collections", sdkParams, &collections) - if err != nil { - log.Fatalf("error querying collections for items available: %v", err) - } - - return collections.ItemsAvailable -} - -func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { +func Summarize(readCollections *ReadCollections) { readCollections.OwnerToCollectionSize = make(map[string]int) + readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int) + for _, coll := range readCollections.UuidToCollection { readCollections.OwnerToCollectionSize[coll.OwnerUuid] = readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + + for block, _ := range coll.BlockDigestToSize { + storedReplication := readCollections.BlockToReplication[block] + if coll.ReplicationLevel > storedReplication { + readCollections.BlockToReplication[block] = coll.ReplicationLevel + } + } } return