Now reading default replication level from api server.
[arvados.git] / services / datamanager / collection / collection.go
index 73115f5fc9b4d41131a56bdfc0cde528da6ec348..5519ad8670ec93611740268d34f845e5c104fffe 100644 (file)
@@ -1,4 +1,4 @@
-/* Deals with parsing Collection responses from API Server. */
+// Deals with parsing Collection responses from API Server.
 
 package collection
 
@@ -9,16 +9,16 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "git.curoverse.com/arvados.git/sdk/go/util"
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
-       "runtime"
        "runtime/pprof"
        "time"
 )
 
 var (
-       heap_profile_filename string
+       heapProfileFilename string
        // globals for debugging
        totalManifestSize uint64
        maxManifestSize   uint64
@@ -33,9 +33,13 @@ type Collection struct {
 }
 
 type ReadCollections struct {
-       ReadAllCollections    bool
-       UuidToCollection      map[string]Collection
-       OwnerToCollectionSize map[string]int
+       ReadAllCollections        bool
+       UuidToCollection          map[string]Collection
+       OwnerToCollectionSize     map[string]int
+       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
+       CollectionUuidToIndex     map[string]int
+       CollectionIndexToUuid     []string
+       BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
 }
 
 type GetCollectionsParams struct {
@@ -58,21 +62,12 @@ type SdkCollectionList struct {
 }
 
 func init() {
-       flag.StringVar(&heap_profile_filename,
+       flag.StringVar(&heapProfileFilename,
                "heap-profile",
                "",
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// // Methods to implement util.SdkListResponse Interface
-// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-//     return s.ItemsAvailable, nil
-// }
-
-// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-//     return len(s.Items), nil
-// }
-
 // Write the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
@@ -80,9 +75,9 @@ func init() {
 // Otherwise we would see cumulative numbers as explained here:
 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
 func WriteHeapProfile() {
-       if heap_profile_filename != "" {
+       if heapProfileFilename != "" {
 
-               heap_profile, err := os.Create(heap_profile_filename)
+               heap_profile, err := os.Create(heapProfileFilename)
                if err != nil {
                        log.Fatal(err)
                }
@@ -98,14 +93,7 @@ func WriteHeapProfile() {
 
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
        results = GetCollections(params)
-       ComputeSizeOfOwnedCollections(&results)
-
-       if params.Logger != nil {
-               properties, _ := params.Logger.Edit()
-               collectionInfo := properties["collection_info"].(map[string]interface{})
-               collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
-               params.Logger.Record()
-       }
+       results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
        log.Printf("Read and processed %d collections",
@@ -130,7 +118,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
-               // TODO(misha): Start using the redundancy field.
                "redundancy",
                "modified_at"}
 
@@ -143,7 +130,29 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["limit"] = params.BatchSize
        }
 
-       initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+       var defaultReplicationLevel int
+       {
+               value, err := params.Client.Discovery("defaultCollectionReplication")
+               if err != nil {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Error querying default collection replication: %v", err))
+               }
+
+               defaultReplicationLevel = int(value.(float64))
+               if defaultReplicationLevel <= 0 {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
+                                       "should be a positive integer but instead it was %d.",
+                                       defaultReplicationLevel))
+               }
+       }
+
+       initialNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               loggerutil.FatalWithMessage(params.Logger,
+                       fmt.Sprintf("Error querying collection count: %v", err))
+       }
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
        maxExpectedCollections := int(
@@ -151,12 +160,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
 
        if params.Logger != nil {
-               properties, _ := params.Logger.Edit()
-               collectionInfo := make(map[string]interface{})
-               collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
-               collectionInfo["batch_size"] = params.BatchSize
-               properties["collection_info"] = collectionInfo
-               params.Logger.Record()
+               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                       collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+                       collectionInfo["batch_size"] = params.BatchSize
+                       collectionInfo["default_replication_level"] = defaultReplicationLevel
+               })
        }
 
        // These values are just for getting the loop to run the first time,
@@ -181,6 +190,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["filters"].([][]string)[0][2] =
                        ProcessCollections(params.Logger,
                                collections.Items,
+                               defaultReplicationLevel,
                                results.UuidToCollection).Format(time.RFC3339)
 
                // update counts
@@ -196,19 +206,16 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                        maxManifestSize, totalManifestSize)
 
                if params.Logger != nil {
-                       properties, _ := params.Logger.Edit()
-                       collectionInfo := properties["collection_info"].(map[string]interface{})
-                       collectionInfo["collections_read"] = totalCollections
-                       collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
-                       collectionInfo["total_manifest_size"] = totalManifestSize
-                       collectionInfo["max_manifest_size"] = maxManifestSize
-                       params.Logger.Record()
+                       params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                               collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                               collectionInfo["collections_read"] = totalCollections
+                               collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+                               collectionInfo["total_manifest_size"] = totalManifestSize
+                               collectionInfo["max_manifest_size"] = maxManifestSize
+                       })
                }
        }
 
-       // Just in case this lowers the numbers reported in the heap profile.
-       runtime.GC()
-
        // Write the heap profile for examining memory usage
        WriteHeapProfile()
 
@@ -224,6 +231,7 @@ func StrCopy(s string) string {
 
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
+       defaultReplicationLevel int,
        uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
@@ -235,15 +243,20 @@ func ProcessCollections(arvLogger *logger.Logger,
                        loggerutil.FatalWithMessage(arvLogger,
                                fmt.Sprintf(
                                        "Arvados SDK collection returned with unexpected zero "+
-                                               "modifcation date. This probably means that either we failed to "+
+                                               "modification date. This probably means that either we failed to "+
                                                "parse the modification date or the API server has changed how "+
-                                               "it returns modification dates: %v",
+                                               "it returns modification dates: %+v",
                                        collection))
                }
 
                if sdkCollection.ModifiedAt.After(latestModificationDate) {
                        latestModificationDate = sdkCollection.ModifiedAt
                }
+
+               if collection.ReplicationLevel == 0 {
+                       collection.ReplicationLevel = defaultReplicationLevel
+               }
+
                manifest := manifest.Manifest{sdkCollection.ManifestText}
                manifestSize := uint64(len(sdkCollection.ManifestText))
 
@@ -282,22 +295,47 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
-func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) int {
-       var collections SdkCollectionList
-       sdkParams := arvadosclient.Dict{"limit": 0}
-       err := client.List("collections", sdkParams, &collections)
-       if err != nil {
-               log.Fatalf("error querying collections for items available: %v", err)
-       }
-
-       return collections.ItemsAvailable
-}
-
-func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
+       readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
+       numCollections := len(readCollections.UuidToCollection)
+       readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
+       readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+       readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
+
        for _, coll := range readCollections.UuidToCollection {
+               collectionIndex := len(readCollections.CollectionIndexToUuid)
+               readCollections.CollectionIndexToUuid =
+                       append(readCollections.CollectionIndexToUuid, coll.Uuid)
+               readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+
                readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
                        readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+
+               for block, size := range coll.BlockDigestToSize {
+                       locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
+                       readCollections.BlockToCollectionIndices[locator] =
+                               append(readCollections.BlockToCollectionIndices[locator],
+                                       collectionIndex)
+                       storedReplication := readCollections.BlockToDesiredReplication[locator]
+                       if coll.ReplicationLevel > storedReplication {
+                               readCollections.BlockToDesiredReplication[locator] =
+                                       coll.ReplicationLevel
+                       }
+               }
+       }
+
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                       // Since maps are shallow copied, we run a risk of concurrent
+                       // updates here. By copying results.OwnerToCollectionSize into
+                       // the log, we're assuming that it won't be updated.
+                       collectionInfo["owner_to_collection_size"] =
+                               readCollections.OwnerToCollectionSize
+                       collectionInfo["distinct_blocks_named"] =
+                               len(readCollections.BlockToDesiredReplication)
+               })
        }
 
        return