Now reading default replication level from api server.
[arvados.git] / services / datamanager / collection / collection.go
index f63b95f0f59a90ff0f68b78aaf2083bcd349b259..5519ad8670ec93611740268d34f845e5c104fffe 100644 (file)
@@ -13,7 +13,6 @@ import (
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
-       "runtime"
        "runtime/pprof"
        "time"
 )
@@ -25,10 +24,6 @@ var (
        maxManifestSize   uint64
 )
 
-const (
-       DefaultReplicationLevel = 2
-)
-
 type Collection struct {
        Uuid              string
        OwnerUuid         string
@@ -38,13 +33,13 @@ type Collection struct {
 }
 
 type ReadCollections struct {
-       ReadAllCollections       bool
-       UuidToCollection         map[string]Collection
-       OwnerToCollectionSize    map[string]int
-       BlockToReplication       map[blockdigest.BlockDigest]int
-       CollectionUuidToIndex    map[string]int
-       CollectionIndexToUuid    []string
-       BlockToCollectionIndices map[blockdigest.BlockDigest][]int
+       ReadAllCollections        bool
+       UuidToCollection          map[string]Collection
+       OwnerToCollectionSize     map[string]int
+       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
+       CollectionUuidToIndex     map[string]int
+       CollectionIndexToUuid     []string
+       BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
 }
 
 type GetCollectionsParams struct {
@@ -123,7 +118,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
-               // TODO(misha): Start using the redundancy field.
                "redundancy",
                "modified_at"}
 
@@ -136,6 +130,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["limit"] = params.BatchSize
        }
 
+       var defaultReplicationLevel int
+       {
+               value, err := params.Client.Discovery("defaultCollectionReplication")
+               if err != nil {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Error querying default collection replication: %v", err))
+               }
+
+               defaultReplicationLevel = int(value.(float64))
+               if defaultReplicationLevel <= 0 {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
+                                       "should be a positive integer but instead it was %d.",
+                                       defaultReplicationLevel))
+               }
+       }
+
        initialNumberOfCollectionsAvailable, err :=
                util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
@@ -153,6 +164,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                        collectionInfo := logger.GetOrCreateMap(p, "collection_info")
                        collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
                        collectionInfo["batch_size"] = params.BatchSize
+                       collectionInfo["default_replication_level"] = defaultReplicationLevel
                })
        }
 
@@ -178,6 +190,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["filters"].([][]string)[0][2] =
                        ProcessCollections(params.Logger,
                                collections.Items,
+                               defaultReplicationLevel,
                                results.UuidToCollection).Format(time.RFC3339)
 
                // update counts
@@ -203,9 +216,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                }
        }
 
-       // Just in case this lowers the numbers reported in the heap profile.
-       runtime.GC()
-
        // Write the heap profile for examining memory usage
        WriteHeapProfile()
 
@@ -221,6 +231,7 @@ func StrCopy(s string) string {
 
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
+       defaultReplicationLevel int,
        uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
@@ -232,7 +243,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                        loggerutil.FatalWithMessage(arvLogger,
                                fmt.Sprintf(
                                        "Arvados SDK collection returned with unexpected zero "+
-                                               "modifcation date. This probably means that either we failed to "+
+                                               "modification date. This probably means that either we failed to "+
                                                "parse the modification date or the API server has changed how "+
                                                "it returns modification dates: %+v",
                                        collection))
@@ -243,7 +254,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                }
 
                if collection.ReplicationLevel == 0 {
-                       collection.ReplicationLevel = DefaultReplicationLevel
+                       collection.ReplicationLevel = defaultReplicationLevel
                }
 
                manifest := manifest.Manifest{sdkCollection.ManifestText}
@@ -286,11 +297,11 @@ func ProcessCollections(arvLogger *logger.Logger,
 
 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
-       readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
+       readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
        numCollections := len(readCollections.UuidToCollection)
        readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
        readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
-       readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int)
+       readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
 
        for _, coll := range readCollections.UuidToCollection {
                collectionIndex := len(readCollections.CollectionIndexToUuid)
@@ -301,12 +312,15 @@ func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
                readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
                        readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
 
-               for block, _ := range coll.BlockDigestToSize {
-                       readCollections.BlockToCollectionIndices[block] =
-                               append(readCollections.BlockToCollectionIndices[block], collectionIndex)
-                       storedReplication := readCollections.BlockToReplication[block]
+               for block, size := range coll.BlockDigestToSize {
+                       locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
+                       readCollections.BlockToCollectionIndices[locator] =
+                               append(readCollections.BlockToCollectionIndices[locator],
+                                       collectionIndex)
+                       storedReplication := readCollections.BlockToDesiredReplication[locator]
                        if coll.ReplicationLevel > storedReplication {
-                               readCollections.BlockToReplication[block] = coll.ReplicationLevel
+                               readCollections.BlockToDesiredReplication[locator] =
+                                       coll.ReplicationLevel
                        }
                }
        }
@@ -320,7 +334,7 @@ func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
                        collectionInfo["owner_to_collection_size"] =
                                readCollections.OwnerToCollectionSize
                        collectionInfo["distinct_blocks_named"] =
-                               len(readCollections.BlockToReplication)
+                               len(readCollections.BlockToDesiredReplication)
                })
        }