// Summarizes Collection Data and Keep Server Contents.
+
package summary
// TODO(misha): Check size of blocks as well as their digest.
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/services/datamanager/collection"
"git.curoverse.com/arvados.git/services/datamanager/keep"
+ "sort"
)
-type BlockSet map[blockdigest.BlockDigest]struct{}
+// BlockSet is a map of blocks
+type BlockSet map[blockdigest.DigestWithSize]struct{}
-func (bs BlockSet) Insert(digest blockdigest.BlockDigest) {
+// Insert adds a single block to the set.
+func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
bs[digest] = struct{}{}
}
-func BlockSetFromSlice(digests []blockdigest.BlockDigest) (bs BlockSet) {
- bs = make(BlockSet)
- for _, digest := range digests {
- bs.Insert(digest)
+// Union adds a set of blocks to the set.
+func (bs BlockSet) Union(obs BlockSet) {
+ for k, v := range obs {
+ bs[k] = v
}
- return
}
-// We use the collection index to save space. To convert to & from the
-// uuid, use collection.ReadCollections' fields CollectionIndexToUuid
-// and CollectionUuidToIndex.
+// CollectionIndexSet is used to save space. To convert to and from
+// the uuid, use collection.ReadCollections' fields
+// CollectionIndexToUUID and CollectionUUIDToIndex.
type CollectionIndexSet map[int]struct{}
+// Insert adds a single collection to the set. The collection is specified by
+// its index.
func (cis CollectionIndexSet) Insert(collectionIndex int) {
cis[collectionIndex] = struct{}{}
}
-func CollectionIndexSetFromSlice(indices []int) (cis CollectionIndexSet) {
- cis = make(CollectionIndexSet)
- for _, index := range indices {
- cis.Insert(index)
- }
- return
-}
-
-
+// ToCollectionIndexSet gets block to collection indices
func (bs BlockSet) ToCollectionIndexSet(
readCollections collection.ReadCollections,
collectionIndexSet *CollectionIndexSet) {
for block := range bs {
- for collectionIndex := range readCollections.BlockToCollectionIndices[block] {
+ for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
collectionIndexSet.Insert(collectionIndex)
}
}
}
+// ReplicationLevels struct
+// Keeps track of the requested and actual replication levels.
+// Currently this is only used for blocks but could easily be used for
+// collections as well.
+type ReplicationLevels struct {
+ // The requested replication level.
+ // For Blocks this is the maximum replication level among all the
+ // collections this block belongs to.
+ Requested int
+
+ // The actual number of keep servers this is on.
+ Actual int
+}
+
+// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
+type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
+
+// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
+// which only reports the number of blocks, not which blocks.
+type ReplicationLevelBlockCount struct {
+ Levels ReplicationLevels
+ Count int
+}
+
+// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
+type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
+
+// ReplicationSummary sturct
type ReplicationSummary struct {
CollectionBlocksNotInKeep BlockSet
UnderReplicatedBlocks BlockSet
CorrectlyReplicatedCollections CollectionIndexSet
}
+// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
type ReplicationSummaryCounts struct {
CollectionBlocksNotInKeep int
UnderReplicatedBlocks int
CorrectlyReplicatedCollections int
}
-type serializedData struct {
- ReadCollections collection.ReadCollections
- KeepServerInfo keep.ReadServers
+// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
+// creating it if it doesn't already exist.
+func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
+ repLevels ReplicationLevels) (bs BlockSet) {
+ bs, exists := rlbs[repLevels]
+ if !exists {
+ bs = make(BlockSet)
+ rlbs[repLevels] = bs
+ }
+ return
}
+// Insert adds a block to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Insert(
+ repLevels ReplicationLevels,
+ block blockdigest.DigestWithSize) {
+ rlbs.GetOrCreate(repLevels).Insert(block)
+}
+
+// Union adds a set of blocks to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Union(
+ repLevels ReplicationLevels,
+ bs BlockSet) {
+ rlbs.GetOrCreate(repLevels).Union(bs)
+}
+
+// Counts outputs a sorted list of ReplicationLevelBlockCounts.
+func (rlbs ReplicationLevelBlockSetMap) Counts() (
+ sorted ReplicationLevelBlockSetSlice) {
+ sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
+ i := 0
+ for levels, set := range rlbs {
+ sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
+ i++
+ }
+ sort.Sort(sorted)
+ return
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Len() int {
+ return len(rlbss)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
+ return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
+ (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
+ rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
+ rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
+}
+
+// ComputeCounts returns ReplicationSummaryCounts
func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
+ // TODO(misha): Consider rewriting this method to iterate through
+ // the fields using reflection, instead of explictily listing the
+ // fields as we do now.
rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
return rsc
}
+// PrettyPrint ReplicationSummaryCounts
func (rsc ReplicationSummaryCounts) PrettyPrint() string {
return fmt.Sprintf("Replication Block Counts:"+
"\n Missing From Keep: %d, "+
rsc.CorrectlyReplicatedCollections)
}
-func SummarizeReplication(readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+// BucketReplication returns ReplicationLevelBlockSetMap
+func BucketReplication(readCollections collection.ReadCollections,
+ keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
+ rlbs = make(ReplicationLevelBlockSetMap)
+
+ for block, requestedReplication := range readCollections.BlockToDesiredReplication {
+ rlbs.Insert(
+ ReplicationLevels{
+ Requested: requestedReplication,
+ Actual: len(keepServerInfo.BlockToServers[block])},
+ block)
+ }
+
+ for block, servers := range keepServerInfo.BlockToServers {
+ if 0 == readCollections.BlockToDesiredReplication[block] {
+ rlbs.Insert(
+ ReplicationLevels{Requested: 0, Actual: len(servers)},
+ block)
+ }
+ }
+ return
+}
+
+// SummarizeBuckets reads collections and summarizes
+func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
+ readCollections collection.ReadCollections) (
+ rs ReplicationSummary) {
rs.CollectionBlocksNotInKeep = make(BlockSet)
rs.UnderReplicatedBlocks = make(BlockSet)
rs.OverReplicatedBlocks = make(BlockSet)
rs.CorrectlyReplicatedBlocks = make(BlockSet)
rs.KeepBlocksNotInCollections = make(BlockSet)
+
rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
rs.UnderReplicatedCollections = make(CollectionIndexSet)
rs.OverReplicatedCollections = make(CollectionIndexSet)
rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
- for block, requestedReplication := range readCollections.BlockToReplication {
- actualReplication := len(keepServerInfo.BlockToServers[block])
- if actualReplication == 0 {
- rs.CollectionBlocksNotInKeep.Insert(block)
- } else if actualReplication < requestedReplication {
- rs.UnderReplicatedBlocks.Insert(block)
- } else if actualReplication > requestedReplication {
- rs.OverReplicatedBlocks.Insert(block)
+ for levels, bs := range rlbs {
+ if levels.Actual == 0 {
+ rs.CollectionBlocksNotInKeep.Union(bs)
+ } else if levels.Requested == 0 {
+ rs.KeepBlocksNotInCollections.Union(bs)
+ } else if levels.Actual < levels.Requested {
+ rs.UnderReplicatedBlocks.Union(bs)
+ } else if levels.Actual > levels.Requested {
+ rs.OverReplicatedBlocks.Union(bs)
} else {
- rs.CorrectlyReplicatedBlocks.Insert(block)
- }
- }
-
- for block, _ := range keepServerInfo.BlockToServers {
- if 0 == readCollections.BlockToReplication[block] {
- rs.KeepBlocksNotInCollections.Insert(block)
+ rs.CorrectlyReplicatedBlocks.Union(bs)
}
}
rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
&rs.OverReplicatedCollections)
- for i := range readCollections.CollectionIndexToUuid {
+ for i := range readCollections.CollectionIndexToUUID {
if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
} else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
} else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
}
}
- return rs
+ return
}