closes #5197
[arvados.git] / services / datamanager / collection / collection.go
index f9e2a3dc82110a35c254e6ea6475bbebdd941dfa..9a7a838f1b9db71eea07bf88cd98316f5e132fd3 100644 (file)
@@ -9,6 +9,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "git.curoverse.com/arvados.git/sdk/go/util"
+       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
        "runtime"
@@ -20,58 +22,49 @@ var (
        heap_profile_filename string
        // globals for debugging
        totalManifestSize uint64
-       maxManifestSize uint64
+       maxManifestSize   uint64
 )
 
 type Collection struct {
-       Uuid string
-       OwnerUuid string
-       ReplicationLevel int
+       Uuid              string
+       OwnerUuid         string
+       ReplicationLevel  int
        BlockDigestToSize map[blockdigest.BlockDigest]int
-       TotalSize int
+       TotalSize         int
 }
 
 type ReadCollections struct {
-       ReadAllCollections bool
-       UuidToCollection map[string]Collection
+       ReadAllCollections    bool
+       UuidToCollection      map[string]Collection
        OwnerToCollectionSize map[string]int
 }
 
 type GetCollectionsParams struct {
-       Client arvadosclient.ArvadosClient
-       Logger *logger.Logger
+       Client    arvadosclient.ArvadosClient
+       Logger    *logger.Logger
        BatchSize int
 }
 
 type SdkCollectionInfo struct {
-       Uuid           string     `json:"uuid"`
-       OwnerUuid      string     `json:"owner_uuid"`
-       Redundancy     int        `json:"redundancy"`
-       ModifiedAt     time.Time  `json:"modified_at"`
-       ManifestText   string     `json:"manifest_text"`
+       Uuid         string    `json:"uuid"`
+       OwnerUuid    string    `json:"owner_uuid"`
+       Redundancy   int       `json:"redundancy"`
+       ModifiedAt   time.Time `json:"modified_at"`
+       ManifestText string    `json:"manifest_text"`
 }
 
 type SdkCollectionList struct {
-       ItemsAvailable   int                   `json:"items_available"`
-       Items            []SdkCollectionInfo   `json:"items"`
+       ItemsAvailable int                 `json:"items_available"`
+       Items          []SdkCollectionInfo `json:"items"`
 }
 
 func init() {
-       flag.StringVar(&heap_profile_filename, 
+       flag.StringVar(&heap_profile_filename,
                "heap-profile",
                "",
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// // Methods to implement util.SdkListResponse Interface
-// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-//     return s.ItemsAvailable, nil
-// }
-
-// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-//     return len(s.Items), nil
-// }
-
 // Write the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
@@ -95,16 +88,18 @@ func WriteHeapProfile() {
        }
 }
 
-
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
        results = GetCollections(params)
        ComputeSizeOfOwnedCollections(&results)
 
        if params.Logger != nil {
-               properties,_ := params.Logger.Edit()
-               collectionInfo := properties["collection_info"].(map[string]interface{})
-               collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
-               params.Logger.Record()
+               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := p["collection_info"].(map[string]interface{})
+                       // Since maps are shallow copied, we run a risk of concurrent
+                       // updates here. By copying results.OwnerToCollectionSize into
+                       // the log, we're assuming that it won't be updated.
+                       collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
+               })
        }
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
@@ -135,15 +130,20 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                "modified_at"}
 
        sdkParams := arvadosclient.Dict{
-               "select": fieldsWanted,
-               "order": []string{"modified_at ASC"},
+               "select":  fieldsWanted,
+               "order":   []string{"modified_at ASC"},
                "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
 
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
        }
 
-       initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+       initialNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               loggerutil.FatalWithMessage(params.Logger,
+                       fmt.Sprintf("Error querying collection count: %v", err))
+       }
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
        maxExpectedCollections := int(
@@ -151,12 +151,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
 
        if params.Logger != nil {
-               properties,_ := params.Logger.Edit()
-               collectionInfo := make(map[string]interface{})
-               collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
-               collectionInfo["batch_size"] = params.BatchSize
-               properties["collection_info"] = collectionInfo
-               params.Logger.Record()
+               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := make(map[string]interface{})
+                       collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+                       collectionInfo["batch_size"] = params.BatchSize
+                       p["collection_info"] = collectionInfo
+               })
        }
 
        // These values are just for getting the loop to run the first time,
@@ -173,36 +173,36 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                var collections SdkCollectionList
                err := params.Client.List("collections", sdkParams, &collections)
                if err != nil {
-                       fatalWithMessage(params.Logger,
+                       loggerutil.FatalWithMessage(params.Logger,
                                fmt.Sprintf("Error querying collections: %v", err))
                }
 
                // Process collection and update our date filter.
                sdkParams["filters"].([][]string)[0][2] =
                        ProcessCollections(params.Logger,
-                       collections.Items,
-                       results.UuidToCollection).Format(time.RFC3339)
+                               collections.Items,
+                               results.UuidToCollection).Format(time.RFC3339)
 
                // update counts
                previousTotalCollections = totalCollections
                totalCollections = len(results.UuidToCollection)
 
-               log.Printf("%d collections read, %d new in last batch, " +
+               log.Printf("%d collections read, %d new in last batch, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
-                       totalCollections - previousTotalCollections,
+                       totalCollections-previousTotalCollections,
                        sdkParams["filters"].([][]string)[0][2],
                        float32(totalManifestSize)/float32(totalCollections),
                        maxManifestSize, totalManifestSize)
 
                if params.Logger != nil {
-                       properties,_ := params.Logger.Edit()
-                       collectionInfo := properties["collection_info"].(map[string]interface{})
-                       collectionInfo["collections_read"] = totalCollections
-                       collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
-                       collectionInfo["total_manifest_size"] = totalManifestSize
-                       collectionInfo["max_manifest_size"] = maxManifestSize
-                       params.Logger.Record()
+                       params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                               collectionInfo := p["collection_info"].(map[string]interface{})
+                               collectionInfo["collections_read"] = totalCollections
+                               collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+                               collectionInfo["total_manifest_size"] = totalManifestSize
+                               collectionInfo["max_manifest_size"] = maxManifestSize
+                       })
                }
        }
 
@@ -215,7 +215,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        return
 }
 
-
 // StrCopy returns a newly allocated string.
 // It is useful to copy slices so that the garbage collector can reuse
 // the memory of the longer strings they came from.
@@ -223,22 +222,21 @@ func StrCopy(s string) string {
        return string([]byte(s))
 }
 
-
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
        uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
-                       OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
-                       ReplicationLevel: sdkCollection.Redundancy,
+                       OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
+                       ReplicationLevel:  sdkCollection.Redundancy,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
                if sdkCollection.ModifiedAt.IsZero() {
-                       fatalWithMessage(arvLogger,
+                       loggerutil.FatalWithMessage(arvLogger,
                                fmt.Sprintf(
-                                       "Arvados SDK collection returned with unexpected zero " +
-                                               "modifcation date. This probably means that either we failed to " +
-                                               "parse the modification date or the API server has changed how " +
+                                       "Arvados SDK collection returned with unexpected zero "+
+                                               "modifcation date. This probably means that either we failed to "+
+                                               "parse the modification date or the API server has changed how "+
                                                "it returns modification dates: %v",
                                        collection))
                }
@@ -255,18 +253,17 @@ func ProcessCollections(arvLogger *logger.Logger,
                if manifestSize > maxManifestSize {
                        maxManifestSize = manifestSize
                }
-               
+
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
-                       if stored_size, stored := collection.BlockDigestToSize[block.Digest];
-                       stored && stored_size != block.Size {
+                       if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
                                message := fmt.Sprintf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
                                        collection.Uuid,
                                        stored_size,
                                        block.Size,
                                        block.Digest)
-                               fatalWithMessage(arvLogger, message)
+                               loggerutil.FatalWithMessage(arvLogger, message)
                        }
                        collection.BlockDigestToSize[block.Digest] = block.Size
                }
@@ -285,19 +282,6 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
-
-func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
-       var collections SdkCollectionList
-       sdkParams := arvadosclient.Dict{"limit": 0}
-       err := client.List("collections", sdkParams, &collections)
-       if err != nil {
-               log.Fatalf("error querying collections for items available: %v", err)
-       }
-
-       return collections.ItemsAvailable
-}
-
-
 func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
        for _, coll := range readCollections.UuidToCollection {
@@ -307,18 +291,3 @@ func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
 
        return
 }
-
-
-// Assumes you haven't already called arvLogger.Edit()!
-// If you have called arvLogger.Edit() this method will hang waiting
-// for the lock you're already holding.
-func fatalWithMessage(arvLogger *logger.Logger, message string) {
-       if arvLogger != nil {
-               properties,_ := arvLogger.Edit()
-               properties["FATAL"] = message
-               properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
-               arvLogger.ForceRecord()
-       }
-
-       log.Fatalf(message)
-}