Now reading default replication level from api server.
authormishaz <misha@curoverse.com>
Tue, 16 Jun 2015 18:04:49 +0000 (18:04 +0000)
committermishaz <misha@curoverse.com>
Tue, 16 Jun 2015 18:04:49 +0000 (18:04 +0000)
services/datamanager/collection/collection.go

index 93fad0e4270c1bdd61780e0bd0fbfb4ef56ca7ab..5519ad8670ec93611740268d34f845e5c104fffe 100644 (file)
@@ -24,12 +24,6 @@ var (
        maxManifestSize   uint64
 )
 
-const (
-       // TODO(misha): Read this value from the SDK once support is added
-       // as suggested in https://arvados.org/issues/3408#note-31
-       DefaultReplicationLevel = 2
-)
-
 type Collection struct {
        Uuid              string
        OwnerUuid         string
@@ -136,6 +130,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["limit"] = params.BatchSize
        }
 
+       var defaultReplicationLevel int
+       {
+               value, err := params.Client.Discovery("defaultCollectionReplication")
+               if err != nil {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Error querying default collection replication: %v", err))
+               }
+
+               defaultReplicationLevel = int(value.(float64))
+               if defaultReplicationLevel <= 0 {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
+                                       "should be a positive integer but instead it was %d.",
+                                       defaultReplicationLevel))
+               }
+       }
+
        initialNumberOfCollectionsAvailable, err :=
                util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
@@ -153,6 +164,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                        collectionInfo := logger.GetOrCreateMap(p, "collection_info")
                        collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
                        collectionInfo["batch_size"] = params.BatchSize
+                       collectionInfo["default_replication_level"] = defaultReplicationLevel
                })
        }
 
@@ -178,6 +190,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["filters"].([][]string)[0][2] =
                        ProcessCollections(params.Logger,
                                collections.Items,
+                               defaultReplicationLevel,
                                results.UuidToCollection).Format(time.RFC3339)
 
                // update counts
@@ -218,6 +231,7 @@ func StrCopy(s string) string {
 
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
+       defaultReplicationLevel int,
        uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
@@ -240,7 +254,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                }
 
                if collection.ReplicationLevel == 0 {
-                       collection.ReplicationLevel = DefaultReplicationLevel
+                       collection.ReplicationLevel = defaultReplicationLevel
                }
 
                manifest := manifest.Manifest{sdkCollection.ManifestText}