X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6b5b6890158830b26161b3879a0d1eeaa122659f..982f7db5b67194ddb3b3dd1fae594784f58b35b8:/services/datamanager/summary/summary.go diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go index 8a381eb7a3..edd760b035 100644 --- a/services/datamanager/summary/summary.go +++ b/services/datamanager/summary/summary.go @@ -1,101 +1,267 @@ -/* Computes Summary based on data read from API server. */ - +// Summarizes Collection Data and Keep Server Contents. package summary +// TODO(misha): Check size of blocks as well as their digest. + import ( - "encoding/gob" - "flag" "fmt" - "git.curoverse.com/arvados.git/sdk/go/logger" + "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" - "log" - "os" + "sort" ) -var ( - // These are just used for development, to save network i/o - writeDataTo string - readDataFrom string -) +type BlockSet map[blockdigest.DigestWithSize]struct{} + +// Adds a single block to the set. +func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) { + bs[digest] = struct{}{} +} + +// Adds a set of blocks to the set. +func (bs BlockSet) Union(obs BlockSet) { + for k, v := range obs { + bs[k] = v + } +} + +// We use the collection index to save space. To convert to and from +// the uuid, use collection.ReadCollections' fields +// CollectionIndexToUuid and CollectionUuidToIndex. +type CollectionIndexSet map[int]struct{} + +// Adds a single collection to the set. The collection is specified by +// its index. +func (cis CollectionIndexSet) Insert(collectionIndex int) { + cis[collectionIndex] = struct{}{} +} -type serializedData struct { - ReadCollections collection.ReadCollections - KeepServerInfo keep.ReadServers -} - -func init() { - flag.StringVar(&writeDataTo, - "write-data-to", - "", - "Write summary of data received to this file. Used for development only.") - flag.StringVar(&readDataFrom, - "read-data-from", - "", - "Avoid network i/o and read summary data from this file instead. Used for development only.") -} - -// Writes data we've read to a file. -// -// This is useful for development, so that we don't need to read all our data from the network every time we tweak something. -// -// This should not be used outside of development, since you'll be -// working with stale data. -func MaybeWriteData(arvLogger *logger.Logger, +func (bs BlockSet) ToCollectionIndexSet( readCollections collection.ReadCollections, - keepServerInfo keep.ReadServers) bool { - if writeDataTo == "" { - return false - } else { - summaryFile, err := os.Create(writeDataTo) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to open %s: %v", writeDataTo, err)) + collectionIndexSet *CollectionIndexSet) { + for block := range bs { + for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] { + collectionIndexSet.Insert(collectionIndex) } - defer summaryFile.Close() - - enc := gob.NewEncoder(summaryFile) - data := serializedData{ - ReadCollections: readCollections, - KeepServerInfo: keepServerInfo} - err = enc.Encode(data) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to write summary data: %v", err)) + } +} + +// Keeps track of the requested and actual replication levels. +// Currently this is only used for blocks but could easily be used for +// collections as well. +type ReplicationLevels struct { + // The requested replication level. + // For Blocks this is the maximum replication level among all the + // collections this block belongs to. + Requested int + + // The actual number of keep servers this is on. + Actual int +} + +// Maps from replication levels to their blocks. +type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet + +// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks. +type ReplicationLevelBlockCount struct { + Levels ReplicationLevels + Count int +} + +// An ordered list of ReplicationLevelBlockCount useful for reporting. +type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount + +type ReplicationSummary struct { + CollectionBlocksNotInKeep BlockSet + UnderReplicatedBlocks BlockSet + OverReplicatedBlocks BlockSet + CorrectlyReplicatedBlocks BlockSet + KeepBlocksNotInCollections BlockSet + + CollectionsNotFullyInKeep CollectionIndexSet + UnderReplicatedCollections CollectionIndexSet + OverReplicatedCollections CollectionIndexSet + CorrectlyReplicatedCollections CollectionIndexSet +} + +// This struct counts the elements in each set in ReplicationSummary. +type ReplicationSummaryCounts struct { + CollectionBlocksNotInKeep int + UnderReplicatedBlocks int + OverReplicatedBlocks int + CorrectlyReplicatedBlocks int + KeepBlocksNotInCollections int + CollectionsNotFullyInKeep int + UnderReplicatedCollections int + OverReplicatedCollections int + CorrectlyReplicatedCollections int +} + +// Gets the BlockSet for a given set of ReplicationLevels, creating it +// if it doesn't already exist. +func (rlbs ReplicationLevelBlockSetMap) GetOrCreate( + repLevels ReplicationLevels) (bs BlockSet) { + bs, exists := rlbs[repLevels] + if !exists { + bs = make(BlockSet) + rlbs[repLevels] = bs + } + return +} + +// Adds a block to the set for a given replication level. +func (rlbs ReplicationLevelBlockSetMap) Insert( + repLevels ReplicationLevels, + block blockdigest.DigestWithSize) { + rlbs.GetOrCreate(repLevels).Insert(block) +} + +// Adds a set of blocks to the set for a given replication level. +func (rlbs ReplicationLevelBlockSetMap) Union( + repLevels ReplicationLevels, + bs BlockSet) { + rlbs.GetOrCreate(repLevels).Union(bs) +} + +// Outputs a sorted list of ReplicationLevelBlockCounts. +func (rlbs ReplicationLevelBlockSetMap) Counts() ( + sorted ReplicationLevelBlockSetSlice) { + sorted = make(ReplicationLevelBlockSetSlice, len(rlbs)) + i := 0 + for levels, set := range rlbs { + sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)} + i++ + } + sort.Sort(sorted) + return +} + +// Implemented to meet sort.Interface +func (rlbss ReplicationLevelBlockSetSlice) Len() int { + return len(rlbss) +} + +// Implemented to meet sort.Interface +func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool { + return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested || + (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested && + rlbss[i].Levels.Actual < rlbss[j].Levels.Actual) +} + +// Implemented to meet sort.Interface +func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) { + rlbss[i], rlbss[j] = rlbss[j], rlbss[i] +} + +func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) { + // TODO(misha): Consider rewriting this method to iterate through + // the fields using reflection, instead of explictily listing the + // fields as we do now. + rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep) + rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks) + rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks) + rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks) + rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections) + rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep) + rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections) + rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections) + rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections) + return rsc +} + +func (rsc ReplicationSummaryCounts) PrettyPrint() string { + return fmt.Sprintf("Replication Block Counts:"+ + "\n Missing From Keep: %d, "+ + "\n Under Replicated: %d, "+ + "\n Over Replicated: %d, "+ + "\n Replicated Just Right: %d, "+ + "\n Not In Any Collection: %d. "+ + "\nReplication Collection Counts:"+ + "\n Missing From Keep: %d, "+ + "\n Under Replicated: %d, "+ + "\n Over Replicated: %d, "+ + "\n Replicated Just Right: %d.", + rsc.CollectionBlocksNotInKeep, + rsc.UnderReplicatedBlocks, + rsc.OverReplicatedBlocks, + rsc.CorrectlyReplicatedBlocks, + rsc.KeepBlocksNotInCollections, + rsc.CollectionsNotFullyInKeep, + rsc.UnderReplicatedCollections, + rsc.OverReplicatedCollections, + rsc.CorrectlyReplicatedCollections) +} + +func BucketReplication(readCollections collection.ReadCollections, + keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) { + rlbsm = make(ReplicationLevelBlockSetMap) + + for block, requestedReplication := range readCollections.BlockToDesiredReplication { + rlbsm.Insert( + ReplicationLevels{ + Requested: requestedReplication, + Actual: len(keepServerInfo.BlockToServers[block])}, + block) + } + + for block, servers := range keepServerInfo.BlockToServers { + if 0 == readCollections.BlockToDesiredReplication[block] { + rlbsm.Insert( + ReplicationLevels{Requested: 0, Actual: len(servers)}, + block) } - log.Printf("Wrote summary data to: %s", writeDataTo) - return true } + return } -// Reads data that we've read to a file. -// -// This is useful for development, so that we don't need to read all our data from the network every time we tweak something. -// -// This should not be used outside of development, since you'll be -// working with stale data. -func MaybeReadData(arvLogger *logger.Logger, - readCollections *collection.ReadCollections, - keepServerInfo *keep.ReadServers) bool { - if readDataFrom == "" { - return false - } else { - summaryFile, err := os.Open(readDataFrom) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to open %s: %v", readDataFrom, err)) +func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets( + readCollections collection.ReadCollections) ( + rs ReplicationSummary) { + rs.CollectionBlocksNotInKeep = make(BlockSet) + rs.UnderReplicatedBlocks = make(BlockSet) + rs.OverReplicatedBlocks = make(BlockSet) + rs.CorrectlyReplicatedBlocks = make(BlockSet) + rs.KeepBlocksNotInCollections = make(BlockSet) + + rs.CollectionsNotFullyInKeep = make(CollectionIndexSet) + rs.UnderReplicatedCollections = make(CollectionIndexSet) + rs.OverReplicatedCollections = make(CollectionIndexSet) + rs.CorrectlyReplicatedCollections = make(CollectionIndexSet) + + for levels, bs := range rlbsm { + if levels.Actual == 0 { + rs.CollectionBlocksNotInKeep.Union(bs) + } else if levels.Requested == 0 { + rs.KeepBlocksNotInCollections.Union(bs) + } else if levels.Actual < levels.Requested { + rs.UnderReplicatedBlocks.Union(bs) + } else if levels.Actual > levels.Requested { + rs.OverReplicatedBlocks.Union(bs) + } else { + rs.CorrectlyReplicatedBlocks.Union(bs) } - defer summaryFile.Close() - - dec := gob.NewDecoder(summaryFile) - data := serializedData{} - err = dec.Decode(&data) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to read summary data: %v", err)) + } + + rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections, + &rs.CollectionsNotFullyInKeep) + // Since different collections can specify different replication + // levels, the fact that a block is under-replicated does not imply + // that all collections that it belongs to are under-replicated, but + // we'll ignore that for now. + // TODO(misha): Fix this and report the correct set of collections. + rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections, + &rs.UnderReplicatedCollections) + rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections, + &rs.OverReplicatedCollections) + + for i := range readCollections.CollectionIndexToUuid { + if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep { + } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated { + } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated { + } else { + rs.CorrectlyReplicatedCollections.Insert(i) } - log.Printf("Read summary data from: %s", readDataFrom) - return true } + + return }