Merge branch 'master' into 7167-keep-rsync
[arvados.git] / services / datamanager / collection / collection.go
index ddc4f953d5cb9ae1865c32688ff6b9066c6c5f68..ca03627405e0742ad419b1d1dd6daf64fba7a341 100644 (file)
@@ -24,44 +24,43 @@ var (
        maxManifestSize   uint64
 )
 
-const (
-       // TODO(misha): Read this value from the SDK once support is added
-       // as suggested in https://arvados.org/issues/3408#note-31
-       DefaultReplicationLevel = 2
-)
-
+// Collection representation
 type Collection struct {
-       Uuid              string
-       OwnerUuid         string
+       UUID              string
+       OwnerUUID         string
        ReplicationLevel  int
        BlockDigestToSize map[blockdigest.BlockDigest]int
        TotalSize         int
 }
 
+// ReadCollections holds information about collections from API server
 type ReadCollections struct {
-       ReadAllCollections       bool
-       UuidToCollection         map[string]Collection
-       OwnerToCollectionSize    map[string]int
-       BlockToReplication       map[blockdigest.BlockDigest]int
-       CollectionUuidToIndex    map[string]int
-       CollectionIndexToUuid    []string
-       BlockToCollectionIndices map[blockdigest.BlockDigest][]int
+       ReadAllCollections        bool
+       UUIDToCollection          map[string]Collection
+       OwnerToCollectionSize     map[string]int
+       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
+       CollectionUUIDToIndex     map[string]int
+       CollectionIndexToUUID     []string
+       BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
 }
 
+// GetCollectionsParams params
 type GetCollectionsParams struct {
        Client    arvadosclient.ArvadosClient
        Logger    *logger.Logger
        BatchSize int
 }
 
+// SdkCollectionInfo holds collection info from api
 type SdkCollectionInfo struct {
-       Uuid         string    `json:"uuid"`
-       OwnerUuid    string    `json:"owner_uuid"`
+       UUID         string    `json:"uuid"`
+       OwnerUUID    string    `json:"owner_uuid"`
        Redundancy   int       `json:"redundancy"`
        ModifiedAt   time.Time `json:"modified_at"`
        ManifestText string    `json:"manifest_text"`
 }
 
+// SdkCollectionList lists collections from api
 type SdkCollectionList struct {
        ItemsAvailable int                 `json:"items_available"`
        Items          []SdkCollectionInfo `json:"items"`
@@ -74,7 +73,7 @@ func init() {
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// Write the heap profile to a file for later review.
+// WriteHeapProfile writes the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
 // to call multiple times in a single run.
@@ -83,27 +82,28 @@ func init() {
 func WriteHeapProfile() {
        if heapProfileFilename != "" {
 
-               heap_profile, err := os.Create(heapProfileFilename)
+               heapProfile, err := os.Create(heapProfileFilename)
                if err != nil {
                        log.Fatal(err)
                }
 
-               defer heap_profile.Close()
+               defer heapProfile.Close()
 
-               err = pprof.WriteHeapProfile(heap_profile)
+               err = pprof.WriteHeapProfile(heapProfile)
                if err != nil {
                        log.Fatal(err)
                }
        }
 }
 
+// GetCollectionsAndSummarize gets collections from api and summarizes
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
        results = GetCollections(params)
        results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
        log.Printf("Read and processed %d collections",
-               len(results.UuidToCollection))
+               len(results.UUIDToCollection))
 
        // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
        // lots of behaviors can become warnings (and obviously we can't
@@ -115,6 +115,7 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
        return
 }
 
+// GetCollections gets collections from api
 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        if &params.Client == nil {
                log.Fatalf("params.Client passed to GetCollections() should " +
@@ -136,6 +137,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["limit"] = params.BatchSize
        }
 
+       var defaultReplicationLevel int
+       {
+               value, err := params.Client.Discovery("defaultCollectionReplication")
+               if err != nil {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Error querying default collection replication: %v", err))
+               }
+
+               defaultReplicationLevel = int(value.(float64))
+               if defaultReplicationLevel <= 0 {
+                       loggerutil.FatalWithMessage(params.Logger,
+                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
+                                       "should be a positive integer but instead it was %d.",
+                                       defaultReplicationLevel))
+               }
+       }
+
        initialNumberOfCollectionsAvailable, err :=
                util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
@@ -146,13 +164,14 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        // that we don't have to grow the map in most cases.
        maxExpectedCollections := int(
                float64(initialNumberOfCollectionsAvailable) * 1.01)
-       results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+       results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
 
        if params.Logger != nil {
                params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        collectionInfo := logger.GetOrCreateMap(p, "collection_info")
                        collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
                        collectionInfo["batch_size"] = params.BatchSize
+                       collectionInfo["default_replication_level"] = defaultReplicationLevel
                })
        }
 
@@ -178,11 +197,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                sdkParams["filters"].([][]string)[0][2] =
                        ProcessCollections(params.Logger,
                                collections.Items,
-                               results.UuidToCollection).Format(time.RFC3339)
+                               defaultReplicationLevel,
+                               results.UUIDToCollection).Format(time.RFC3339)
 
                // update counts
                previousTotalCollections = totalCollections
-               totalCollections = len(results.UuidToCollection)
+               totalCollections = len(results.UUIDToCollection)
 
                log.Printf("%d collections read, %d new in last batch, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
@@ -216,12 +236,14 @@ func StrCopy(s string) string {
        return string([]byte(s))
 }
 
+// ProcessCollections read from api server
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
-       uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+       defaultReplicationLevel int,
+       UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
-               collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
-                       OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
+               collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+                       OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
                        ReplicationLevel:  sdkCollection.Redundancy,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
@@ -240,13 +262,13 @@ func ProcessCollections(arvLogger *logger.Logger,
                }
 
                if collection.ReplicationLevel == 0 {
-                       collection.ReplicationLevel = DefaultReplicationLevel
+                       collection.ReplicationLevel = defaultReplicationLevel
                }
 
                manifest := manifest.Manifest{sdkCollection.ManifestText}
                manifestSize := uint64(len(sdkCollection.ManifestText))
 
-               if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+               if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
                        totalManifestSize += manifestSize
                }
                if manifestSize > maxManifestSize {
@@ -255,11 +277,11 @@ func ProcessCollections(arvLogger *logger.Logger,
 
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
-                       if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
+                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
                                message := fmt.Sprintf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
-                                       collection.Uuid,
-                                       stored_size,
+                                       collection.UUID,
+                                       storedSize,
                                        block.Size,
                                        block.Digest)
                                loggerutil.FatalWithMessage(arvLogger, message)
@@ -270,7 +292,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                for _, size := range collection.BlockDigestToSize {
                        collection.TotalSize += size
                }
-               uuidToCollection[collection.Uuid] = collection
+               UUIDToCollection[collection.UUID] = collection
 
                // Clear out all the manifest strings that we don't need anymore.
                // These hopefully form the bulk of our memory usage.
@@ -281,29 +303,33 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
+// Summarize the collections read
 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
-       readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
-       numCollections := len(readCollections.UuidToCollection)
-       readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
-       readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
-       readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int)
-
-       for _, coll := range readCollections.UuidToCollection {
-               collectionIndex := len(readCollections.CollectionIndexToUuid)
-               readCollections.CollectionIndexToUuid =
-                       append(readCollections.CollectionIndexToUuid, coll.Uuid)
-               readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
-
-               readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
-                       readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
-
-               for block, _ := range coll.BlockDigestToSize {
-                       readCollections.BlockToCollectionIndices[block] =
-                               append(readCollections.BlockToCollectionIndices[block], collectionIndex)
-                       storedReplication := readCollections.BlockToReplication[block]
+       readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
+       numCollections := len(readCollections.UUIDToCollection)
+       readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
+       readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
+       readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
+
+       for _, coll := range readCollections.UUIDToCollection {
+               collectionIndex := len(readCollections.CollectionIndexToUUID)
+               readCollections.CollectionIndexToUUID =
+                       append(readCollections.CollectionIndexToUUID, coll.UUID)
+               readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
+
+               readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
+                       readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
+
+               for block, size := range coll.BlockDigestToSize {
+                       locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
+                       readCollections.BlockToCollectionIndices[locator] =
+                               append(readCollections.BlockToCollectionIndices[locator],
+                                       collectionIndex)
+                       storedReplication := readCollections.BlockToDesiredReplication[locator]
                        if coll.ReplicationLevel > storedReplication {
-                               readCollections.BlockToReplication[block] = coll.ReplicationLevel
+                               readCollections.BlockToDesiredReplication[locator] =
+                                       coll.ReplicationLevel
                        }
                }
        }
@@ -317,7 +343,7 @@ func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
                        collectionInfo["owner_to_collection_size"] =
                                readCollections.OwnerToCollectionSize
                        collectionInfo["distinct_blocks_named"] =
-                               len(readCollections.BlockToReplication)
+                               len(readCollections.BlockToDesiredReplication)
                })
        }