X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a56406d730f2a07dd442b9e99ef9dab7b7d81895..5f93e6f5823e4ee2a25616037ace6ab6d416e581:/services/datamanager/summary/summary.go diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go index 71d1affec0..9fb0316b73 100644 --- a/services/datamanager/summary/summary.go +++ b/services/datamanager/summary/summary.go @@ -1,4 +1,5 @@ // Summarizes Collection Data and Keep Server Contents. + package summary // TODO(misha): Check size of blocks as well as their digest. @@ -8,50 +9,74 @@ import ( "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" + "sort" ) -type BlockSet map[blockdigest.BlockDigest]struct{} +// BlockSet is a map of blocks +type BlockSet map[blockdigest.DigestWithSize]struct{} -func (bs BlockSet) Insert(digest blockdigest.BlockDigest) { +// Insert adds a single block to the set. +func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) { bs[digest] = struct{}{} } -func BlockSetFromSlice(digests []blockdigest.BlockDigest) (bs BlockSet) { - bs = make(BlockSet) - for _, digest := range digests { - bs.Insert(digest) +// Union adds a set of blocks to the set. +func (bs BlockSet) Union(obs BlockSet) { + for k, v := range obs { + bs[k] = v } - return } -// We use the collection index to save space. To convert to & from the -// uuid, use collection.ReadCollections' fields CollectionIndexToUuid -// and CollectionUuidToIndex. +// CollectionIndexSet is used to save space. To convert to and from +// the uuid, use collection.ReadCollections' fields +// CollectionIndexToUUID and CollectionUUIDToIndex. type CollectionIndexSet map[int]struct{} +// Insert adds a single collection to the set. The collection is specified by +// its index. func (cis CollectionIndexSet) Insert(collectionIndex int) { cis[collectionIndex] = struct{}{} } -func CollectionIndexSetFromSlice(indices []int) (cis CollectionIndexSet) { - cis = make(CollectionIndexSet) - for _, index := range indices { - cis.Insert(index) - } - return -} - - +// ToCollectionIndexSet gets block to collection indices func (bs BlockSet) ToCollectionIndexSet( readCollections collection.ReadCollections, collectionIndexSet *CollectionIndexSet) { for block := range bs { - for collectionIndex := range readCollections.BlockToCollectionIndices[block] { + for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] { collectionIndexSet.Insert(collectionIndex) } } } +// ReplicationLevels struct +// Keeps track of the requested and actual replication levels. +// Currently this is only used for blocks but could easily be used for +// collections as well. +type ReplicationLevels struct { + // The requested replication level. + // For Blocks this is the maximum replication level among all the + // collections this block belongs to. + Requested int + + // The actual number of keep servers this is on. + Actual int +} + +// ReplicationLevelBlockSetMap maps from replication levels to their blocks. +type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet + +// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap +// which only reports the number of blocks, not which blocks. +type ReplicationLevelBlockCount struct { + Levels ReplicationLevels + Count int +} + +// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting. +type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount + +// ReplicationSummary sturct type ReplicationSummary struct { CollectionBlocksNotInKeep BlockSet UnderReplicatedBlocks BlockSet @@ -65,6 +90,7 @@ type ReplicationSummary struct { CorrectlyReplicatedCollections CollectionIndexSet } +// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary. type ReplicationSummaryCounts struct { CollectionBlocksNotInKeep int UnderReplicatedBlocks int @@ -77,12 +103,67 @@ type ReplicationSummaryCounts struct { CorrectlyReplicatedCollections int } -type serializedData struct { - ReadCollections collection.ReadCollections - KeepServerInfo keep.ReadServers +// GetOrCreate gets the BlockSet for a given set of ReplicationLevels, +// creating it if it doesn't already exist. +func (rlbs ReplicationLevelBlockSetMap) GetOrCreate( + repLevels ReplicationLevels) (bs BlockSet) { + bs, exists := rlbs[repLevels] + if !exists { + bs = make(BlockSet) + rlbs[repLevels] = bs + } + return } +// Insert adds a block to the set for a given replication level. +func (rlbs ReplicationLevelBlockSetMap) Insert( + repLevels ReplicationLevels, + block blockdigest.DigestWithSize) { + rlbs.GetOrCreate(repLevels).Insert(block) +} + +// Union adds a set of blocks to the set for a given replication level. +func (rlbs ReplicationLevelBlockSetMap) Union( + repLevels ReplicationLevels, + bs BlockSet) { + rlbs.GetOrCreate(repLevels).Union(bs) +} + +// Counts outputs a sorted list of ReplicationLevelBlockCounts. +func (rlbs ReplicationLevelBlockSetMap) Counts() ( + sorted ReplicationLevelBlockSetSlice) { + sorted = make(ReplicationLevelBlockSetSlice, len(rlbs)) + i := 0 + for levels, set := range rlbs { + sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)} + i++ + } + sort.Sort(sorted) + return +} + +// Implemented to meet sort.Interface +func (rlbss ReplicationLevelBlockSetSlice) Len() int { + return len(rlbss) +} + +// Implemented to meet sort.Interface +func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool { + return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested || + (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested && + rlbss[i].Levels.Actual < rlbss[j].Levels.Actual) +} + +// Implemented to meet sort.Interface +func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) { + rlbss[i], rlbss[j] = rlbss[j], rlbss[i] +} + +// ComputeCounts returns ReplicationSummaryCounts func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) { + // TODO(misha): Consider rewriting this method to iterate through + // the fields using reflection, instead of explictily listing the + // fields as we do now. rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep) rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks) rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks) @@ -95,6 +176,7 @@ func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) { return rsc } +// PrettyPrint ReplicationSummaryCounts func (rsc ReplicationSummaryCounts) PrettyPrint() string { return fmt.Sprintf("Replication Block Counts:"+ "\n Missing From Keep: %d, "+ @@ -118,34 +200,55 @@ func (rsc ReplicationSummaryCounts) PrettyPrint() string { rsc.CorrectlyReplicatedCollections) } -func SummarizeReplication(readCollections collection.ReadCollections, - keepServerInfo keep.ReadServers) (rs ReplicationSummary) { +// BucketReplication returns ReplicationLevelBlockSetMap +func BucketReplication(readCollections collection.ReadCollections, + keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) { + rlbs = make(ReplicationLevelBlockSetMap) + + for block, requestedReplication := range readCollections.BlockToDesiredReplication { + rlbs.Insert( + ReplicationLevels{ + Requested: requestedReplication, + Actual: len(keepServerInfo.BlockToServers[block])}, + block) + } + + for block, servers := range keepServerInfo.BlockToServers { + if 0 == readCollections.BlockToDesiredReplication[block] { + rlbs.Insert( + ReplicationLevels{Requested: 0, Actual: len(servers)}, + block) + } + } + return +} + +// SummarizeBuckets reads collections and summarizes +func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets( + readCollections collection.ReadCollections) ( + rs ReplicationSummary) { rs.CollectionBlocksNotInKeep = make(BlockSet) rs.UnderReplicatedBlocks = make(BlockSet) rs.OverReplicatedBlocks = make(BlockSet) rs.CorrectlyReplicatedBlocks = make(BlockSet) rs.KeepBlocksNotInCollections = make(BlockSet) + rs.CollectionsNotFullyInKeep = make(CollectionIndexSet) rs.UnderReplicatedCollections = make(CollectionIndexSet) rs.OverReplicatedCollections = make(CollectionIndexSet) rs.CorrectlyReplicatedCollections = make(CollectionIndexSet) - for block, requestedReplication := range readCollections.BlockToReplication { - actualReplication := len(keepServerInfo.BlockToServers[block]) - if actualReplication == 0 { - rs.CollectionBlocksNotInKeep.Insert(block) - } else if actualReplication < requestedReplication { - rs.UnderReplicatedBlocks.Insert(block) - } else if actualReplication > requestedReplication { - rs.OverReplicatedBlocks.Insert(block) + for levels, bs := range rlbs { + if levels.Actual == 0 { + rs.CollectionBlocksNotInKeep.Union(bs) + } else if levels.Requested == 0 { + rs.KeepBlocksNotInCollections.Union(bs) + } else if levels.Actual < levels.Requested { + rs.UnderReplicatedBlocks.Union(bs) + } else if levels.Actual > levels.Requested { + rs.OverReplicatedBlocks.Union(bs) } else { - rs.CorrectlyReplicatedBlocks.Insert(block) - } - } - - for block, _ := range keepServerInfo.BlockToServers { - if 0 == readCollections.BlockToReplication[block] { - rs.KeepBlocksNotInCollections.Insert(block) + rs.CorrectlyReplicatedBlocks.Union(bs) } } @@ -161,7 +264,7 @@ func SummarizeReplication(readCollections collection.ReadCollections, rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections, &rs.OverReplicatedCollections) - for i := range readCollections.CollectionIndexToUuid { + for i := range readCollections.CollectionIndexToUUID { if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep { } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated { } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated { @@ -170,5 +273,5 @@ func SummarizeReplication(readCollections collection.ReadCollections, } } - return rs + return }