Merge branch '8016-crunchrun-crunchstat'
[arvados.git] / services / datamanager / summary / summary.go
index 37e9373969efb826ba5c9f1efb895937f892df3f..9fb0316b736c74e789f6e0edea96042c05fd0a91 100644 (file)
@@ -1,4 +1,5 @@
 // Summarizes Collection Data and Keep Server Contents.
+
 package summary
 
 // TODO(misha): Check size of blocks as well as their digest.
@@ -8,33 +9,74 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/services/datamanager/collection"
        "git.curoverse.com/arvados.git/services/datamanager/keep"
+       "sort"
 )
 
-type BlockSet map[blockdigest.BlockDigest]struct{}
+// BlockSet is a map of blocks
+type BlockSet map[blockdigest.DigestWithSize]struct{}
 
-func (bs BlockSet) Insert(digest blockdigest.BlockDigest) {
+// Insert adds a single block to the set.
+func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
        bs[digest] = struct{}{}
 }
 
-// We use the collection index to save space. To convert to and from
+// Union adds a set of blocks to the set.
+func (bs BlockSet) Union(obs BlockSet) {
+       for k, v := range obs {
+               bs[k] = v
+       }
+}
+
+// CollectionIndexSet is used to save space. To convert to and from
 // the uuid, use collection.ReadCollections' fields
-// CollectionIndexToUuid and CollectionUuidToIndex.
+// CollectionIndexToUUID and CollectionUUIDToIndex.
 type CollectionIndexSet map[int]struct{}
 
+// Insert adds a single collection to the set. The collection is specified by
+// its index.
 func (cis CollectionIndexSet) Insert(collectionIndex int) {
        cis[collectionIndex] = struct{}{}
 }
 
+// ToCollectionIndexSet gets block to collection indices
 func (bs BlockSet) ToCollectionIndexSet(
        readCollections collection.ReadCollections,
        collectionIndexSet *CollectionIndexSet) {
        for block := range bs {
-               for _,collectionIndex := range readCollections.BlockToCollectionIndices[block] {
+               for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
                        collectionIndexSet.Insert(collectionIndex)
                }
        }
 }
 
+// ReplicationLevels struct
+// Keeps track of the requested and actual replication levels.
+// Currently this is only used for blocks but could easily be used for
+// collections as well.
+type ReplicationLevels struct {
+       // The requested replication level.
+       // For Blocks this is the maximum replication level among all the
+       // collections this block belongs to.
+       Requested int
+
+       // The actual number of keep servers this is on.
+       Actual int
+}
+
+// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
+type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
+
+// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
+// which only reports the number of blocks, not which blocks.
+type ReplicationLevelBlockCount struct {
+       Levels ReplicationLevels
+       Count  int
+}
+
+// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
+type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
+
+// ReplicationSummary sturct
 type ReplicationSummary struct {
        CollectionBlocksNotInKeep  BlockSet
        UnderReplicatedBlocks      BlockSet
@@ -48,7 +90,7 @@ type ReplicationSummary struct {
        CorrectlyReplicatedCollections CollectionIndexSet
 }
 
-// This struct counts the elements in each set in ReplicationSummary.
+// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
 type ReplicationSummaryCounts struct {
        CollectionBlocksNotInKeep      int
        UnderReplicatedBlocks          int
@@ -61,9 +103,67 @@ type ReplicationSummaryCounts struct {
        CorrectlyReplicatedCollections int
 }
 
+// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
+// creating it if it doesn't already exist.
+func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
+       repLevels ReplicationLevels) (bs BlockSet) {
+       bs, exists := rlbs[repLevels]
+       if !exists {
+               bs = make(BlockSet)
+               rlbs[repLevels] = bs
+       }
+       return
+}
+
+// Insert adds a block to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Insert(
+       repLevels ReplicationLevels,
+       block blockdigest.DigestWithSize) {
+       rlbs.GetOrCreate(repLevels).Insert(block)
+}
+
+// Union adds a set of blocks to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Union(
+       repLevels ReplicationLevels,
+       bs BlockSet) {
+       rlbs.GetOrCreate(repLevels).Union(bs)
+}
+
+// Counts outputs a sorted list of ReplicationLevelBlockCounts.
+func (rlbs ReplicationLevelBlockSetMap) Counts() (
+       sorted ReplicationLevelBlockSetSlice) {
+       sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
+       i := 0
+       for levels, set := range rlbs {
+               sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
+               i++
+       }
+       sort.Sort(sorted)
+       return
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Len() int {
+       return len(rlbss)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
+       return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
+               (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
+                       rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
+       rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
+}
+
+// ComputeCounts returns ReplicationSummaryCounts
 func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
-       // TODO(misha): Consider replacing this brute-force approach by
-       // iterating through the fields using reflection.
+       // TODO(misha): Consider rewriting this method to iterate through
+       // the fields using reflection, instead of explictily listing the
+       // fields as we do now.
        rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
        rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
        rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
@@ -76,6 +176,7 @@ func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
        return rsc
 }
 
+// PrettyPrint ReplicationSummaryCounts
 func (rsc ReplicationSummaryCounts) PrettyPrint() string {
        return fmt.Sprintf("Replication Block Counts:"+
                "\n Missing From Keep: %d, "+
@@ -99,34 +200,55 @@ func (rsc ReplicationSummaryCounts) PrettyPrint() string {
                rsc.CorrectlyReplicatedCollections)
 }
 
-func SummarizeReplication(readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+// BucketReplication returns ReplicationLevelBlockSetMap
+func BucketReplication(readCollections collection.ReadCollections,
+       keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
+       rlbs = make(ReplicationLevelBlockSetMap)
+
+       for block, requestedReplication := range readCollections.BlockToDesiredReplication {
+               rlbs.Insert(
+                       ReplicationLevels{
+                               Requested: requestedReplication,
+                               Actual:    len(keepServerInfo.BlockToServers[block])},
+                       block)
+       }
+
+       for block, servers := range keepServerInfo.BlockToServers {
+               if 0 == readCollections.BlockToDesiredReplication[block] {
+                       rlbs.Insert(
+                               ReplicationLevels{Requested: 0, Actual: len(servers)},
+                               block)
+               }
+       }
+       return
+}
+
+// SummarizeBuckets reads collections and summarizes
+func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
+       readCollections collection.ReadCollections) (
+       rs ReplicationSummary) {
        rs.CollectionBlocksNotInKeep = make(BlockSet)
        rs.UnderReplicatedBlocks = make(BlockSet)
        rs.OverReplicatedBlocks = make(BlockSet)
        rs.CorrectlyReplicatedBlocks = make(BlockSet)
        rs.KeepBlocksNotInCollections = make(BlockSet)
+
        rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
        rs.UnderReplicatedCollections = make(CollectionIndexSet)
        rs.OverReplicatedCollections = make(CollectionIndexSet)
        rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
 
-       for block, requestedReplication := range readCollections.BlockToReplication {
-               actualReplication := len(keepServerInfo.BlockToServers[block])
-               if actualReplication == 0 {
-                       rs.CollectionBlocksNotInKeep.Insert(block)
-               } else if actualReplication < requestedReplication {
-                       rs.UnderReplicatedBlocks.Insert(block)
-               } else if actualReplication > requestedReplication {
-                       rs.OverReplicatedBlocks.Insert(block)
+       for levels, bs := range rlbs {
+               if levels.Actual == 0 {
+                       rs.CollectionBlocksNotInKeep.Union(bs)
+               } else if levels.Requested == 0 {
+                       rs.KeepBlocksNotInCollections.Union(bs)
+               } else if levels.Actual < levels.Requested {
+                       rs.UnderReplicatedBlocks.Union(bs)
+               } else if levels.Actual > levels.Requested {
+                       rs.OverReplicatedBlocks.Union(bs)
                } else {
-                       rs.CorrectlyReplicatedBlocks.Insert(block)
-               }
-       }
-
-       for block, _ := range keepServerInfo.BlockToServers {
-               if 0 == readCollections.BlockToReplication[block] {
-                       rs.KeepBlocksNotInCollections.Insert(block)
+                       rs.CorrectlyReplicatedBlocks.Union(bs)
                }
        }
 
@@ -142,7 +264,7 @@ func SummarizeReplication(readCollections collection.ReadCollections,
        rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
                &rs.OverReplicatedCollections)
 
-       for i := range readCollections.CollectionIndexToUuid {
+       for i := range readCollections.CollectionIndexToUUID {
                if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
                } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
                } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
@@ -151,5 +273,5 @@ func SummarizeReplication(readCollections collection.ReadCollections,
                }
        }
 
-       return rs
+       return
 }