Switched collections.ReadCollections.BlockToCollectionIndex to collections.ReadCollections.BlockToCollectionIndices since a block can belong to more than one collection.
Made collection.Summarize a method of ReadCollections.
Made a couple testing libraries in blockdigest and collection.
Moved collection.MakeBlockDigest() to blockdigest.MakeTestBlockDigest() (in the testing library).
Created collection.MakeTestReadCollections to simply writing and reading tests (in the testing library).
Created the BlockSet and CollectionIndexSet types to hide some of the awkwardness of using maps as sets and added FromSlice methods.
Moved functions for reading and writing data locally to separate file.
Created separate ReplicationSummaryCounts struct and PrettyPrint method.
Added stats for Collections in addition to Blocks.
-/* Stores a Block Locator Digest compactly. Can be used as a map key. */
-
+// Stores a Block Locator Digest compactly. Can be used as a map key.
package blockdigest
import (
--- /dev/null
+// Code used for testing only.
+
+package blockdigest
+
+import (
+ "fmt"
+)
+
+// Just used for testing when we need some distinct BlockDigests
+func MakeTestBlockDigest(i int) BlockDigest {
+ return AssertFromString(fmt.Sprintf("%032x", i))
+}
+++ /dev/null
-// This binary tests the logger package.
-// It's not a standard unit test. Instead it writes to the actual log
-// and you have to clean up after it.
-
-package main
-
-import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "log"
-)
-
-func main() {
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- log.Fatalf("Error setting up arvados client %v", err)
- }
-
- l := logger.NewLogger(logger.LoggerParams{Client: arv,
- EventType: "experimental-logger-testing",
- // No minimum write interval
- })
-
- {
- properties, _ := l.Edit()
- properties["Ninja"] = "Misha"
- }
- l.Record()
-}
-/* Deals with parsing Collection responses from API Server. */
+// Deals with parsing Collection responses from API Server.
package collection
}
type ReadCollections struct {
- ReadAllCollections bool
- UuidToCollection map[string]Collection
- OwnerToCollectionSize map[string]int
- BlockToReplication map[blockdigest.BlockDigest]int
- CollectionUuidToIndex map[string]int
- CollectionIndexToUuid []string
- BlockToCollectionIndex map[blockdigest.BlockDigest]int
+ ReadAllCollections bool
+ UuidToCollection map[string]Collection
+ OwnerToCollectionSize map[string]int
+ BlockToReplication map[blockdigest.BlockDigest]int
+ CollectionUuidToIndex map[string]int
+ CollectionIndexToUuid []string
+ BlockToCollectionIndices map[blockdigest.BlockDigest][]int
}
type GetCollectionsParams struct {
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
- Summarize(&results)
+ results.Summarize()
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
return
}
-func Summarize(readCollections *ReadCollections) {
+func (readCollections *ReadCollections) Summarize() {
readCollections.OwnerToCollectionSize = make(map[string]int)
readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
numCollections := len(readCollections.UuidToCollection)
readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
- readCollections.BlockToCollectionIndex = make(map[blockdigest.BlockDigest]int)
+ readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int)
for _, coll := range readCollections.UuidToCollection {
collectionIndex := len(readCollections.CollectionIndexToUuid)
readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
for block, _ := range coll.BlockDigestToSize {
- readCollections.BlockToCollectionIndex[block] = collectionIndex
+ readCollections.BlockToCollectionIndices[block] =
+ append(readCollections.BlockToCollectionIndices[block], collectionIndex)
storedReplication := readCollections.BlockToReplication[block]
if coll.ReplicationLevel > storedReplication {
readCollections.BlockToReplication[block] = coll.ReplicationLevel
--- /dev/null
+package collection
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "reflect"
+ "testing"
+)
+
+// This captures the result we expect from
+// ReadCollections.Summarize(). Because CollectionUuidToIndex is
+// indeterminate, we replace BlockToCollectionIndices with
+// BlockToCollectionUuids.
+type ExpectedSummary struct {
+ OwnerToCollectionSize map[string]int
+ BlockToReplication map[blockdigest.BlockDigest]int
+ BlockToCollectionUuids map[blockdigest.BlockDigest][]string
+}
+
+func CompareSummarizedReadCollections(t *testing.T,
+ summarized ReadCollections,
+ expected ExpectedSummary) {
+
+ if !reflect.DeepEqual(summarized.OwnerToCollectionSize,
+ expected.OwnerToCollectionSize) {
+ t.Fatalf("Expected summarized OwnerToCollectionSize to look like %+v but instead it is %+v",
+ expected.OwnerToCollectionSize,
+ summarized.OwnerToCollectionSize)
+ }
+
+ if !reflect.DeepEqual(summarized.BlockToReplication,
+ expected.BlockToReplication) {
+ t.Fatalf("Expected summarized BlockToReplication to look like %+v but instead it is %+v",
+ expected.BlockToReplication,
+ summarized.BlockToReplication)
+ }
+
+ summarizedBlockToCollectionUuids :=
+ make(map[blockdigest.BlockDigest]map[string]struct{})
+ for digest, indices := range summarized.BlockToCollectionIndices {
+ uuidSet := make(map[string]struct{})
+ summarizedBlockToCollectionUuids[digest] = uuidSet
+ for _, index := range indices {
+ uuidSet[summarized.CollectionIndexToUuid[index]] = struct{}{}
+ }
+ }
+
+ expectedBlockToCollectionUuids :=
+ make(map[blockdigest.BlockDigest]map[string]struct{})
+ for digest, uuidSlice := range expected.BlockToCollectionUuids {
+ uuidSet := make(map[string]struct{})
+ expectedBlockToCollectionUuids[digest] = uuidSet
+ for _, uuid := range uuidSlice {
+ uuidSet[uuid] = struct{}{}
+ }
+ }
+
+ if !reflect.DeepEqual(summarizedBlockToCollectionUuids,
+ expectedBlockToCollectionUuids) {
+ t.Fatalf("Expected summarized BlockToCollectionUuids to look like %+v but instead it is %+v", expectedBlockToCollectionUuids, summarizedBlockToCollectionUuids)
+ }
+}
+
+func TestSummarizeSimple(t *testing.T) {
+ rc := MakeTestReadCollections([]TestCollectionSpec{TestCollectionSpec{
+ ReplicationLevel: 5,
+ Blocks: []int{1, 2},
+ }})
+
+ rc.Summarize()
+
+ c := rc.UuidToCollection["col0"]
+
+ blockDigest1 := blockdigest.MakeTestBlockDigest(1)
+ blockDigest2 := blockdigest.MakeTestBlockDigest(2)
+
+ expected := ExpectedSummary{
+ OwnerToCollectionSize: map[string]int{c.OwnerUuid: c.TotalSize},
+ BlockToReplication: map[blockdigest.BlockDigest]int{blockDigest1: 5, blockDigest2: 5},
+ BlockToCollectionUuids: map[blockdigest.BlockDigest][]string{blockDigest1: []string{c.Uuid}, blockDigest2: []string{c.Uuid}},
+ }
+
+ CompareSummarizedReadCollections(t, rc, expected)
+}
+
+func TestSummarizeOverlapping(t *testing.T) {
+ rc := MakeTestReadCollections([]TestCollectionSpec{
+ TestCollectionSpec{
+ ReplicationLevel: 5,
+ Blocks: []int{1, 2},
+ },
+ TestCollectionSpec{
+ ReplicationLevel: 8,
+ Blocks: []int{2, 3},
+ },
+ })
+
+ rc.Summarize()
+
+ c0 := rc.UuidToCollection["col0"]
+ c1 := rc.UuidToCollection["col1"]
+
+ blockDigest1 := blockdigest.MakeTestBlockDigest(1)
+ blockDigest2 := blockdigest.MakeTestBlockDigest(2)
+ blockDigest3 := blockdigest.MakeTestBlockDigest(3)
+
+ expected := ExpectedSummary{
+ OwnerToCollectionSize: map[string]int{
+ c0.OwnerUuid: c0.TotalSize,
+ c1.OwnerUuid: c1.TotalSize,
+ },
+ BlockToReplication: map[blockdigest.BlockDigest]int{
+ blockDigest1: 5,
+ blockDigest2: 8,
+ blockDigest3: 8,
+ },
+ BlockToCollectionUuids: map[blockdigest.BlockDigest][]string{
+ blockDigest1: []string{c0.Uuid},
+ blockDigest2: []string{c0.Uuid, c1.Uuid},
+ blockDigest3: []string{c1.Uuid},
+ },
+ }
+
+ CompareSummarizedReadCollections(t, rc, expected)
+}
--- /dev/null
+// Code used for testing only.
+
+package collection
+
+import (
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+)
+
+type TestCollectionSpec struct {
+ // The desired replication level
+ ReplicationLevel int
+ // Blocks this contains, represented by ints. Ints repeated will
+ // still only represent one block
+ Blocks []int
+}
+
+// Creates a ReadCollections object for testing based on the give
+// specs. Only the ReadAllCollections and UuidToCollection fields are
+// populated. To populate other fields call rc.Summarize().
+func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
+ rc = ReadCollections{
+ ReadAllCollections: true,
+ UuidToCollection: map[string]Collection{},
+ }
+
+ for i, spec := range specs {
+ c := Collection{
+ Uuid: fmt.Sprintf("col%d", i),
+ OwnerUuid: fmt.Sprintf("owner%d", i),
+ ReplicationLevel: spec.ReplicationLevel,
+ BlockDigestToSize: map[blockdigest.BlockDigest]int{},
+ }
+ rc.UuidToCollection[c.Uuid] = c
+ for _, j := range spec.Blocks {
+ c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j
+ }
+ // We compute the size in a separate loop because the value
+ // computed in the above loop would be invalid if c.Blocks
+ // contained duplicates.
+ for _, size := range c.BlockDigestToSize {
+ c.TotalSize += size
+ }
+ }
+ return
+}
summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
replicationSummary :=
- summary.SummarizeReplication(arvLogger, readCollections, keepServerInfo)
-
- log.Printf("Replication Counts:" +
- "\nBlocks In Collections: %d, " +
- "\nBlocks In Keep: %d, " +
- "\nMissing From Keep: %d, " +
- "\nUnder Replicated: %d, " +
- "\nOver Replicated: %d, " +
- "\nReplicated Just Right: %d, " +
- "\nNot In Any Collection: %d.",
+ summary.SummarizeReplication(readCollections, keepServerInfo)
+
+ log.Printf("Blocks In Collections: %d, " +
+ "\nBlocks In Keep: %d.",
len(readCollections.BlockToReplication),
- len(keepServerInfo.BlockToServers),
- len(replicationSummary.CollectionBlocksNotInKeep),
- len(replicationSummary.UnderReplicatedBlocks),
- len(replicationSummary.OverReplicatedBlocks),
- len(replicationSummary.CorrectlyReplicatedBlocks),
- len(replicationSummary.KeepBlocksNotInCollections))
+ len(keepServerInfo.BlockToServers))
+ log.Println(replicationSummary.ComputeCounts().PrettyPrint())
// Log that we're finished. We force the recording, since go will
// not wait for the timer before exiting.
--- /dev/null
+// Handles writing data to and reading data from disk to speed up development.
+
+package summary
+
+import (
+ "encoding/gob"
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/logger"
+ "git.curoverse.com/arvados.git/services/datamanager/collection"
+ "git.curoverse.com/arvados.git/services/datamanager/keep"
+ "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "log"
+ "os"
+)
+
+var (
+ writeDataTo string
+ readDataFrom string
+)
+
+func init() {
+ flag.StringVar(&writeDataTo,
+ "write-data-to",
+ "",
+ "Write summary of data received to this file. Used for development only.")
+ flag.StringVar(&readDataFrom,
+ "read-data-from",
+ "",
+ "Avoid network i/o and read summary data from this file instead. Used for development only.")
+}
+
+// Writes data we've read to a file.
+//
+// This is useful for development, so that we don't need to read all
+// our data from the network every time we tweak something.
+//
+// This should not be used outside of development, since you'll be
+// working with stale data.
+func MaybeWriteData(arvLogger *logger.Logger,
+ readCollections collection.ReadCollections,
+ keepServerInfo keep.ReadServers) bool {
+ if writeDataTo == "" {
+ return false
+ } else {
+ summaryFile, err := os.Create(writeDataTo)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
+ }
+ defer summaryFile.Close()
+
+ enc := gob.NewEncoder(summaryFile)
+ data := serializedData{
+ ReadCollections: readCollections,
+ KeepServerInfo: keepServerInfo}
+ err = enc.Encode(data)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to write summary data: %v", err))
+ }
+ log.Printf("Wrote summary data to: %s", writeDataTo)
+ return true
+ }
+}
+
+// Reads data that we've written to a file.
+//
+// This is useful for development, so that we don't need to read all
+// our data from the network every time we tweak something.
+//
+// This should not be used outside of development, since you'll be
+// working with stale data.
+func MaybeReadData(arvLogger *logger.Logger,
+ readCollections *collection.ReadCollections,
+ keepServerInfo *keep.ReadServers) bool {
+ if readDataFrom == "" {
+ return false
+ } else {
+ summaryFile, err := os.Open(readDataFrom)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to open %s: %v", readDataFrom, err))
+ }
+ defer summaryFile.Close()
+
+ dec := gob.NewDecoder(summaryFile)
+ data := serializedData{}
+ err = dec.Decode(&data)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to read summary data: %v", err))
+ }
+
+ // re-summarize data, so that we can update our summarizing
+ // functions without needing to do all our network i/o
+ data.ReadCollections.Summarize()
+ keep.ComputeBlockReplicationCounts(&data.KeepServerInfo)
+
+ *readCollections = data.ReadCollections
+ *keepServerInfo = data.KeepServerInfo
+ log.Printf("Read summary data from: %s", readDataFrom)
+ return true
+ }
+}
-/* Computes Summary based on data read from API server. */
-
+// Summarizes Collection Data and Keep Server Contents.
package summary
+// TODO(misha): Check size of blocks as well as their digest.
+
import (
- "encoding/gob"
- "flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/collection"
"git.curoverse.com/arvados.git/services/datamanager/keep"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
- "log"
- "os"
)
-type ReplicationSummary struct {
- CollectionBlocksNotInKeep map[blockdigest.BlockDigest]struct{}
- UnderReplicatedBlocks map[blockdigest.BlockDigest]struct{}
- OverReplicatedBlocks map[blockdigest.BlockDigest]struct{}
- CorrectlyReplicatedBlocks map[blockdigest.BlockDigest]struct{}
- KeepBlocksNotInCollections map[blockdigest.BlockDigest]struct{}
+type BlockSet map[blockdigest.BlockDigest]struct{}
+
+func (bs BlockSet) Insert(digest blockdigest.BlockDigest) {
+ bs[digest] = struct{}{}
}
-type serializedData struct {
- ReadCollections collection.ReadCollections
- KeepServerInfo keep.ReadServers
+func BlockSetFromSlice(digests []blockdigest.BlockDigest) (bs BlockSet) {
+ bs = make(BlockSet)
+ for _, digest := range digests {
+ bs.Insert(digest)
+ }
+ return
}
-var (
- writeDataTo string
- readDataFrom string
-)
+// We use the collection index to save space. To convert to & from the
+// uuid, use collection.ReadCollections' fields CollectionIndexToUuid
+// and CollectionUuidToIndex.
+type CollectionIndexSet map[int]struct{}
+
+func (cis CollectionIndexSet) Insert(collectionIndex int) {
+ cis[collectionIndex] = struct{}{}
+}
-func init() {
- flag.StringVar(&writeDataTo,
- "write-data-to",
- "",
- "Write summary of data received to this file. Used for development only.")
- flag.StringVar(&readDataFrom,
- "read-data-from",
- "",
- "Avoid network i/o and read summary data from this file instead. Used for development only.")
+func CollectionIndexSetFromSlice(indices []int) (cis CollectionIndexSet) {
+ cis = make(CollectionIndexSet)
+ for _, index := range indices {
+ cis.Insert(index)
+ }
+ return
}
-// Writes data we've read to a file.
-//
-// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeWriteData(arvLogger *logger.Logger,
+
+func (bs BlockSet) ToCollectionIndexSet(
readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) bool {
- if writeDataTo == "" {
- return false
- } else {
- summaryFile, err := os.Create(writeDataTo)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
- }
- defer summaryFile.Close()
-
- enc := gob.NewEncoder(summaryFile)
- data := serializedData{
- ReadCollections: readCollections,
- KeepServerInfo: keepServerInfo}
- err = enc.Encode(data)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to write summary data: %v", err))
+ collectionIndexSet *CollectionIndexSet) {
+ for block := range bs {
+ for collectionIndex := range readCollections.BlockToCollectionIndices[block] {
+ collectionIndexSet.Insert(collectionIndex)
}
- log.Printf("Wrote summary data to: %s", writeDataTo)
- return true
}
}
-// Reads data that we've read to a file.
-//
-// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeReadData(arvLogger *logger.Logger,
- readCollections *collection.ReadCollections,
- keepServerInfo *keep.ReadServers) bool {
- if readDataFrom == "" {
- return false
- } else {
- summaryFile, err := os.Open(readDataFrom)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", readDataFrom, err))
- }
- defer summaryFile.Close()
-
- dec := gob.NewDecoder(summaryFile)
- data := serializedData{}
- err = dec.Decode(&data)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to read summary data: %v", err))
- }
+type ReplicationSummary struct {
+ CollectionBlocksNotInKeep BlockSet
+ UnderReplicatedBlocks BlockSet
+ OverReplicatedBlocks BlockSet
+ CorrectlyReplicatedBlocks BlockSet
+ KeepBlocksNotInCollections BlockSet
+
+ CollectionsNotFullyInKeep CollectionIndexSet
+ UnderReplicatedCollections CollectionIndexSet
+ OverReplicatedCollections CollectionIndexSet
+ CorrectlyReplicatedCollections CollectionIndexSet
+}
- // re-summarize data, so that we can update our summarizing
- // functions without needing to do all our network i/o
- collection.Summarize(&data.ReadCollections)
- keep.ComputeBlockReplicationCounts(&data.KeepServerInfo)
+type ReplicationSummaryCounts struct {
+ CollectionBlocksNotInKeep int
+ UnderReplicatedBlocks int
+ OverReplicatedBlocks int
+ CorrectlyReplicatedBlocks int
+ KeepBlocksNotInCollections int
+ CollectionsNotFullyInKeep int
+ UnderReplicatedCollections int
+ OverReplicatedCollections int
+ CorrectlyReplicatedCollections int
+}
- *readCollections = data.ReadCollections
- *keepServerInfo = data.KeepServerInfo
- log.Printf("Read summary data from: %s", readDataFrom)
- return true
- }
+type serializedData struct {
+ ReadCollections collection.ReadCollections
+ KeepServerInfo keep.ReadServers
}
-func SummarizeReplication(arvLogger *logger.Logger,
- readCollections collection.ReadCollections,
+func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
+ rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
+ rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
+ rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
+ rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks)
+ rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections)
+ rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep)
+ rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections)
+ rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections)
+ rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections)
+ return rsc
+}
+
+func (rsc ReplicationSummaryCounts) PrettyPrint() string {
+ return fmt.Sprintf("Replication Block Counts:"+
+ "\n Missing From Keep: %d, "+
+ "\n Under Replicated: %d, "+
+ "\n Over Replicated: %d, "+
+ "\n Replicated Just Right: %d, "+
+ "\n Not In Any Collection: %d. "+
+ "\nReplication Collection Counts:"+
+ "\n Missing From Keep: %d, "+
+ "\n Under Replicated: %d, "+
+ "\n Over Replicated: %d, "+
+ "\n Replicated Just Right: %d.",
+ rsc.CollectionBlocksNotInKeep,
+ rsc.UnderReplicatedBlocks,
+ rsc.OverReplicatedBlocks,
+ rsc.CorrectlyReplicatedBlocks,
+ rsc.KeepBlocksNotInCollections,
+ rsc.CollectionsNotFullyInKeep,
+ rsc.UnderReplicatedCollections,
+ rsc.OverReplicatedCollections,
+ rsc.CorrectlyReplicatedCollections)
+}
+
+func SummarizeReplication(readCollections collection.ReadCollections,
keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
- rs.CollectionBlocksNotInKeep = make(map[blockdigest.BlockDigest]struct{})
- rs.UnderReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
- rs.OverReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
- rs.CorrectlyReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
- rs.KeepBlocksNotInCollections = make(map[blockdigest.BlockDigest]struct{})
+ rs.CollectionBlocksNotInKeep = make(BlockSet)
+ rs.UnderReplicatedBlocks = make(BlockSet)
+ rs.OverReplicatedBlocks = make(BlockSet)
+ rs.CorrectlyReplicatedBlocks = make(BlockSet)
+ rs.KeepBlocksNotInCollections = make(BlockSet)
+ rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
+ rs.UnderReplicatedCollections = make(CollectionIndexSet)
+ rs.OverReplicatedCollections = make(CollectionIndexSet)
+ rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
for block, requestedReplication := range readCollections.BlockToReplication {
actualReplication := len(keepServerInfo.BlockToServers[block])
if actualReplication == 0 {
- rs.CollectionBlocksNotInKeep[block] = struct{}{}
+ rs.CollectionBlocksNotInKeep.Insert(block)
} else if actualReplication < requestedReplication {
- rs.UnderReplicatedBlocks[block] = struct{}{}
+ rs.UnderReplicatedBlocks.Insert(block)
} else if actualReplication > requestedReplication {
- rs.OverReplicatedBlocks[block] = struct{}{}
+ rs.OverReplicatedBlocks.Insert(block)
} else {
- rs.CorrectlyReplicatedBlocks[block] = struct{}{}
+ rs.CorrectlyReplicatedBlocks.Insert(block)
}
}
for block, _ := range keepServerInfo.BlockToServers {
if 0 == readCollections.BlockToReplication[block] {
- rs.KeepBlocksNotInCollections[block] = struct{}{}
+ rs.KeepBlocksNotInCollections.Insert(block)
+ }
+ }
+
+ rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections,
+ &rs.CollectionsNotFullyInKeep)
+ // Since different collections can specify different replication
+ // levels, the fact that a block is under-replicated does not imply
+ // that all collections that it belongs to are under-replicated, but
+ // we'll ignore that for now.
+ // TODO(misha): Fix this and report the correct set of collections.
+ rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections,
+ &rs.UnderReplicatedCollections)
+ rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
+ &rs.OverReplicatedCollections)
+
+ for i := range readCollections.CollectionIndexToUuid {
+ if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
+ } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
+ } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
+ } else {
+ rs.CorrectlyReplicatedCollections.Insert(i)
}
}