X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2d2f3bed79f9504d15503277056feb394c12dd7c..f3250432a47c835f4c594348b0d4904a247c3365:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 73115f5fc9..9a7a838f1b 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -9,6 +9,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" + "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" @@ -64,15 +65,6 @@ func init() { "File to write the heap profiles to. Leave blank to skip profiling.") } -// // Methods to implement util.SdkListResponse Interface -// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { -// return s.ItemsAvailable, nil -// } - -// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { -// return len(s.Items), nil -// } - // Write the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe @@ -101,10 +93,13 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec ComputeSizeOfOwnedCollections(&results) if params.Logger != nil { - properties, _ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize + }) } log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) @@ -143,7 +138,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["limit"] = params.BatchSize } - initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client) + initialNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying collection count: %v", err)) + } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. maxExpectedCollections := int( @@ -151,12 +151,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { - properties, _ := params.Logger.Edit() - collectionInfo := make(map[string]interface{}) - collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable - collectionInfo["batch_size"] = params.BatchSize - properties["collection_info"] = collectionInfo - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := make(map[string]interface{}) + collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable + collectionInfo["batch_size"] = params.BatchSize + p["collection_info"] = collectionInfo + }) } // These values are just for getting the loop to run the first time, @@ -196,13 +196,13 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { maxManifestSize, totalManifestSize) if params.Logger != nil { - properties, _ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["collections_read"] = totalCollections - collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] - collectionInfo["total_manifest_size"] = totalManifestSize - collectionInfo["max_manifest_size"] = maxManifestSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo["collections_read"] = totalCollections + collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] + collectionInfo["total_manifest_size"] = totalManifestSize + collectionInfo["max_manifest_size"] = maxManifestSize + }) } } @@ -282,17 +282,6 @@ func ProcessCollections(arvLogger *logger.Logger, return } -func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) int { - var collections SdkCollectionList - sdkParams := arvadosclient.Dict{"limit": 0} - err := client.List("collections", sdkParams, &collections) - if err != nil { - log.Fatalf("error querying collections for items available: %v", err) - } - - return collections.ItemsAvailable -} - func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { readCollections.OwnerToCollectionSize = make(map[string]int) for _, coll := range readCollections.UuidToCollection {