Added code for generating pull lists.
[arvados.git] / services / datamanager / collection / collection.go
index a3f1e311f2467cbe392c074df27be2f62863f2e3..e0929b77575fc7e8eb66e016d4d68ecef08aa131 100644 (file)
@@ -1,4 +1,4 @@
-/* Deals with parsing Collection responses from API Server. */
+// Deals with parsing Collection responses from API Server.
 
 package collection
 
@@ -13,7 +13,6 @@ import (
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
-       "runtime"
        "runtime/pprof"
        "time"
 )
@@ -38,13 +37,13 @@ type Collection struct {
 }
 
 type ReadCollections struct {
-       ReadAllCollections     bool
-       UuidToCollection       map[string]Collection
-       OwnerToCollectionSize  map[string]int
-       BlockToReplication     map[blockdigest.BlockDigest]int
-       CollectionUuidToIndex  map[string]int
-       CollectionIndexToUuid  []string
-       BlockToCollectionIndex map[blockdigest.BlockDigest]int
+       ReadAllCollections       bool
+       UuidToCollection         map[string]Collection
+       OwnerToCollectionSize    map[string]int
+       BlockToReplication       map[blockdigest.BlockDigest]int
+       CollectionUuidToIndex    map[string]int
+       CollectionIndexToUuid    []string
+       BlockToCollectionIndices map[blockdigest.BlockDigest][]int
 }
 
 type GetCollectionsParams struct {
@@ -98,17 +97,7 @@ func WriteHeapProfile() {
 
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
        results = GetCollections(params)
-       Summarize(&results)
-
-       if params.Logger != nil {
-               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       collectionInfo := p["collection_info"].(map[string]interface{})
-                       // Since maps are shallow copied, we run a risk of concurrent
-                       // updates here. By copying results.OwnerToCollectionSize into
-                       // the log, we're assuming that it won't be updated.
-                       collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
-               })
-       }
+       results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
        log.Printf("Read and processed %d collections",
@@ -133,7 +122,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
-               // TODO(misha): Start using the redundancy field.
                "redundancy",
                "modified_at"}
 
@@ -160,10 +148,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
        if params.Logger != nil {
                params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       collectionInfo := make(map[string]interface{})
+                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
                        collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
                        collectionInfo["batch_size"] = params.BatchSize
-                       p["collection_info"] = collectionInfo
                })
        }
 
@@ -205,7 +192,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
                if params.Logger != nil {
                        params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               collectionInfo := p["collection_info"].(map[string]interface{})
+                               collectionInfo := logger.GetOrCreateMap(p, "collection_info")
                                collectionInfo["collections_read"] = totalCollections
                                collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
                                collectionInfo["total_manifest_size"] = totalManifestSize
@@ -214,9 +201,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                }
        }
 
-       // Just in case this lowers the numbers reported in the heap profile.
-       runtime.GC()
-
        // Write the heap profile for examining memory usage
        WriteHeapProfile()
 
@@ -243,7 +227,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                        loggerutil.FatalWithMessage(arvLogger,
                                fmt.Sprintf(
                                        "Arvados SDK collection returned with unexpected zero "+
-                                               "modifcation date. This probably means that either we failed to "+
+                                               "modification date. This probably means that either we failed to "+
                                                "parse the modification date or the API server has changed how "+
                                                "it returns modification dates: %+v",
                                        collection))
@@ -295,13 +279,13 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
-func Summarize(readCollections *ReadCollections) {
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
        readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
        numCollections := len(readCollections.UuidToCollection)
        readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
        readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
-       readCollections.BlockToCollectionIndex = make(map[blockdigest.BlockDigest]int)
+       readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int)
 
        for _, coll := range readCollections.UuidToCollection {
                collectionIndex := len(readCollections.CollectionIndexToUuid)
@@ -313,7 +297,8 @@ func Summarize(readCollections *ReadCollections) {
                        readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
 
                for block, _ := range coll.BlockDigestToSize {
-                       readCollections.BlockToCollectionIndex[block] = collectionIndex
+                       readCollections.BlockToCollectionIndices[block] =
+                               append(readCollections.BlockToCollectionIndices[block], collectionIndex)
                        storedReplication := readCollections.BlockToReplication[block]
                        if coll.ReplicationLevel > storedReplication {
                                readCollections.BlockToReplication[block] = coll.ReplicationLevel
@@ -321,5 +306,18 @@ func Summarize(readCollections *ReadCollections) {
                }
        }
 
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                       // Since maps are shallow copied, we run a risk of concurrent
+                       // updates here. By copying results.OwnerToCollectionSize into
+                       // the log, we're assuming that it won't be updated.
+                       collectionInfo["owner_to_collection_size"] =
+                               readCollections.OwnerToCollectionSize
+                       collectionInfo["distinct_blocks_named"] =
+                               len(readCollections.BlockToReplication)
+               })
+       }
+
        return
 }