Renamed BlockToReplication BlockToDesiredReplication.
[arvados.git] / services / datamanager / summary / summary.go
index 8a381eb7a39a3a7312f27c33748c024c0b89ecaf..edd760b035d066627b51f913799bb2606f4c0141 100644 (file)
-/* Computes Summary based on data read from API server. */
-
+// Summarizes Collection Data and Keep Server Contents.
 package summary
 
+// TODO(misha): Check size of blocks as well as their digest.
+
 import (
-       "encoding/gob"
-       "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
+       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/services/datamanager/collection"
        "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
-       "log"
-       "os"
+       "sort"
 )
 
-var (
-       // These are just used for development, to save network i/o
-       writeDataTo  string
-       readDataFrom string
-)
+type BlockSet map[blockdigest.DigestWithSize]struct{}
+
+// Adds a single block to the set.
+func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
+       bs[digest] = struct{}{}
+}
+
+// Adds a set of blocks to the set.
+func (bs BlockSet) Union(obs BlockSet) {
+       for k, v := range obs {
+               bs[k] = v
+       }
+}
+
+// We use the collection index to save space. To convert to and from
+// the uuid, use collection.ReadCollections' fields
+// CollectionIndexToUuid and CollectionUuidToIndex.
+type CollectionIndexSet map[int]struct{}
+
+// Adds a single collection to the set. The collection is specified by
+// its index.
+func (cis CollectionIndexSet) Insert(collectionIndex int) {
+       cis[collectionIndex] = struct{}{}
+}
 
-type serializedData struct {
-       ReadCollections collection.ReadCollections
-       KeepServerInfo  keep.ReadServers
-}
-
-func init() {
-       flag.StringVar(&writeDataTo,
-               "write-data-to",
-               "",
-               "Write summary of data received to this file. Used for development only.")
-       flag.StringVar(&readDataFrom,
-               "read-data-from",
-               "",
-               "Avoid network i/o and read summary data from this file instead. Used for development only.")
-}
-
-// Writes data we've read to a file.
-//
-// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeWriteData(arvLogger *logger.Logger,
+func (bs BlockSet) ToCollectionIndexSet(
        readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) bool {
-       if writeDataTo == "" {
-               return false
-       } else {
-               summaryFile, err := os.Create(writeDataTo)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
+       collectionIndexSet *CollectionIndexSet) {
+       for block := range bs {
+               for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
+                       collectionIndexSet.Insert(collectionIndex)
                }
-               defer summaryFile.Close()
-
-               enc := gob.NewEncoder(summaryFile)
-               data := serializedData{
-                       ReadCollections: readCollections,
-                       KeepServerInfo:  keepServerInfo}
-               err = enc.Encode(data)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to write summary data: %v", err))
+       }
+}
+
+// Keeps track of the requested and actual replication levels.
+// Currently this is only used for blocks but could easily be used for
+// collections as well.
+type ReplicationLevels struct {
+       // The requested replication level.
+       // For Blocks this is the maximum replication level among all the
+       // collections this block belongs to.
+       Requested int
+
+       // The actual number of keep servers this is on.
+       Actual int
+}
+
+// Maps from replication levels to their blocks.
+type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
+
+// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks.
+type ReplicationLevelBlockCount struct {
+       Levels ReplicationLevels
+       Count  int
+}
+
+// An ordered list of ReplicationLevelBlockCount useful for reporting.
+type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
+
+type ReplicationSummary struct {
+       CollectionBlocksNotInKeep  BlockSet
+       UnderReplicatedBlocks      BlockSet
+       OverReplicatedBlocks       BlockSet
+       CorrectlyReplicatedBlocks  BlockSet
+       KeepBlocksNotInCollections BlockSet
+
+       CollectionsNotFullyInKeep      CollectionIndexSet
+       UnderReplicatedCollections     CollectionIndexSet
+       OverReplicatedCollections      CollectionIndexSet
+       CorrectlyReplicatedCollections CollectionIndexSet
+}
+
+// This struct counts the elements in each set in ReplicationSummary.
+type ReplicationSummaryCounts struct {
+       CollectionBlocksNotInKeep      int
+       UnderReplicatedBlocks          int
+       OverReplicatedBlocks           int
+       CorrectlyReplicatedBlocks      int
+       KeepBlocksNotInCollections     int
+       CollectionsNotFullyInKeep      int
+       UnderReplicatedCollections     int
+       OverReplicatedCollections      int
+       CorrectlyReplicatedCollections int
+}
+
+// Gets the BlockSet for a given set of ReplicationLevels, creating it
+// if it doesn't already exist.
+func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
+       repLevels ReplicationLevels) (bs BlockSet) {
+       bs, exists := rlbs[repLevels]
+       if !exists {
+               bs = make(BlockSet)
+               rlbs[repLevels] = bs
+       }
+       return
+}
+
+// Adds a block to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Insert(
+       repLevels ReplicationLevels,
+       block blockdigest.DigestWithSize) {
+       rlbs.GetOrCreate(repLevels).Insert(block)
+}
+
+// Adds a set of blocks to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Union(
+       repLevels ReplicationLevels,
+       bs BlockSet) {
+       rlbs.GetOrCreate(repLevels).Union(bs)
+}
+
+// Outputs a sorted list of ReplicationLevelBlockCounts.
+func (rlbs ReplicationLevelBlockSetMap) Counts() (
+       sorted ReplicationLevelBlockSetSlice) {
+       sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
+       i := 0
+       for levels, set := range rlbs {
+               sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
+               i++
+       }
+       sort.Sort(sorted)
+       return
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Len() int {
+       return len(rlbss)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
+       return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
+               (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
+                       rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
+       rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
+}
+
+func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
+       // TODO(misha): Consider rewriting this method to iterate through
+       // the fields using reflection, instead of explictily listing the
+       // fields as we do now.
+       rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
+       rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
+       rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
+       rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks)
+       rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections)
+       rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep)
+       rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections)
+       rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections)
+       rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections)
+       return rsc
+}
+
+func (rsc ReplicationSummaryCounts) PrettyPrint() string {
+       return fmt.Sprintf("Replication Block Counts:"+
+               "\n Missing From Keep: %d, "+
+               "\n Under Replicated: %d, "+
+               "\n Over Replicated: %d, "+
+               "\n Replicated Just Right: %d, "+
+               "\n Not In Any Collection: %d. "+
+               "\nReplication Collection Counts:"+
+               "\n Missing From Keep: %d, "+
+               "\n Under Replicated: %d, "+
+               "\n Over Replicated: %d, "+
+               "\n Replicated Just Right: %d.",
+               rsc.CollectionBlocksNotInKeep,
+               rsc.UnderReplicatedBlocks,
+               rsc.OverReplicatedBlocks,
+               rsc.CorrectlyReplicatedBlocks,
+               rsc.KeepBlocksNotInCollections,
+               rsc.CollectionsNotFullyInKeep,
+               rsc.UnderReplicatedCollections,
+               rsc.OverReplicatedCollections,
+               rsc.CorrectlyReplicatedCollections)
+}
+
+func BucketReplication(readCollections collection.ReadCollections,
+       keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) {
+       rlbsm = make(ReplicationLevelBlockSetMap)
+
+       for block, requestedReplication := range readCollections.BlockToDesiredReplication {
+               rlbsm.Insert(
+                       ReplicationLevels{
+                               Requested: requestedReplication,
+                               Actual:    len(keepServerInfo.BlockToServers[block])},
+                       block)
+       }
+
+       for block, servers := range keepServerInfo.BlockToServers {
+               if 0 == readCollections.BlockToDesiredReplication[block] {
+                       rlbsm.Insert(
+                               ReplicationLevels{Requested: 0, Actual: len(servers)},
+                               block)
                }
-               log.Printf("Wrote summary data to: %s", writeDataTo)
-               return true
        }
+       return
 }
 
-// Reads data that we've read to a file.
-//
-// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeReadData(arvLogger *logger.Logger,
-       readCollections *collection.ReadCollections,
-       keepServerInfo *keep.ReadServers) bool {
-       if readDataFrom == "" {
-               return false
-       } else {
-               summaryFile, err := os.Open(readDataFrom)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to open %s: %v", readDataFrom, err))
+func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
+       readCollections collection.ReadCollections) (
+       rs ReplicationSummary) {
+       rs.CollectionBlocksNotInKeep = make(BlockSet)
+       rs.UnderReplicatedBlocks = make(BlockSet)
+       rs.OverReplicatedBlocks = make(BlockSet)
+       rs.CorrectlyReplicatedBlocks = make(BlockSet)
+       rs.KeepBlocksNotInCollections = make(BlockSet)
+
+       rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
+       rs.UnderReplicatedCollections = make(CollectionIndexSet)
+       rs.OverReplicatedCollections = make(CollectionIndexSet)
+       rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
+
+       for levels, bs := range rlbsm {
+               if levels.Actual == 0 {
+                       rs.CollectionBlocksNotInKeep.Union(bs)
+               } else if levels.Requested == 0 {
+                       rs.KeepBlocksNotInCollections.Union(bs)
+               } else if levels.Actual < levels.Requested {
+                       rs.UnderReplicatedBlocks.Union(bs)
+               } else if levels.Actual > levels.Requested {
+                       rs.OverReplicatedBlocks.Union(bs)
+               } else {
+                       rs.CorrectlyReplicatedBlocks.Union(bs)
                }
-               defer summaryFile.Close()
-
-               dec := gob.NewDecoder(summaryFile)
-               data := serializedData{}
-               err = dec.Decode(&data)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to read summary data: %v", err))
+       }
+
+       rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections,
+               &rs.CollectionsNotFullyInKeep)
+       // Since different collections can specify different replication
+       // levels, the fact that a block is under-replicated does not imply
+       // that all collections that it belongs to are under-replicated, but
+       // we'll ignore that for now.
+       // TODO(misha): Fix this and report the correct set of collections.
+       rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections,
+               &rs.UnderReplicatedCollections)
+       rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
+               &rs.OverReplicatedCollections)
+
+       for i := range readCollections.CollectionIndexToUuid {
+               if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
+               } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
+               } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
+               } else {
+                       rs.CorrectlyReplicatedCollections.Insert(i)
                }
-               log.Printf("Read summary data from: %s", readDataFrom)
-               return true
        }
+
+       return
 }