closes #5197
[arvados.git] / services / datamanager / collection / collection.go
index 73115f5fc9b4d41131a56bdfc0cde528da6ec348..9a7a838f1b9db71eea07bf88cd98316f5e132fd3 100644 (file)
@@ -9,6 +9,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "git.curoverse.com/arvados.git/sdk/go/util"
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
@@ -64,15 +65,6 @@ func init() {
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// // Methods to implement util.SdkListResponse Interface
-// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-//     return s.ItemsAvailable, nil
-// }
-
-// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-//     return len(s.Items), nil
-// }
-
 // Write the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
@@ -101,10 +93,13 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
        ComputeSizeOfOwnedCollections(&results)
 
        if params.Logger != nil {
-               properties, _ := params.Logger.Edit()
-               collectionInfo := properties["collection_info"].(map[string]interface{})
-               collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
-               params.Logger.Record()
+               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := p["collection_info"].(map[string]interface{})
+                       // Since maps are shallow copied, we run a risk of concurrent
+                       // updates here. By copying results.OwnerToCollectionSize into
+                       // the log, we're assuming that it won't be updated.
+                       collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
+               })
        }
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
@@ -143,7 +138,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["limit"] = params.BatchSize
        }
 
-       initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+       initialNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               loggerutil.FatalWithMessage(params.Logger,
+                       fmt.Sprintf("Error querying collection count: %v", err))
+       }
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
        maxExpectedCollections := int(
@@ -151,12 +151,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
 
        if params.Logger != nil {
-               properties, _ := params.Logger.Edit()
-               collectionInfo := make(map[string]interface{})
-               collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
-               collectionInfo["batch_size"] = params.BatchSize
-               properties["collection_info"] = collectionInfo
-               params.Logger.Record()
+               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := make(map[string]interface{})
+                       collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+                       collectionInfo["batch_size"] = params.BatchSize
+                       p["collection_info"] = collectionInfo
+               })
        }
 
        // These values are just for getting the loop to run the first time,
@@ -196,13 +196,13 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                        maxManifestSize, totalManifestSize)
 
                if params.Logger != nil {
-                       properties, _ := params.Logger.Edit()
-                       collectionInfo := properties["collection_info"].(map[string]interface{})
-                       collectionInfo["collections_read"] = totalCollections
-                       collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
-                       collectionInfo["total_manifest_size"] = totalManifestSize
-                       collectionInfo["max_manifest_size"] = maxManifestSize
-                       params.Logger.Record()
+                       params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                               collectionInfo := p["collection_info"].(map[string]interface{})
+                               collectionInfo["collections_read"] = totalCollections
+                               collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+                               collectionInfo["total_manifest_size"] = totalManifestSize
+                               collectionInfo["max_manifest_size"] = maxManifestSize
+                       })
                }
        }
 
@@ -282,17 +282,6 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
-func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) int {
-       var collections SdkCollectionList
-       sdkParams := arvadosclient.Dict{"limit": 0}
-       err := client.List("collections", sdkParams, &collections)
-       if err != nil {
-               log.Fatalf("error querying collections for items available: %v", err)
-       }
-
-       return collections.ItemsAvailable
-}
-
 func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
        for _, coll := range readCollections.UuidToCollection {