Added unit test for protocols in From field.
[arvados.git] / services / datamanager / summary / summary.go
1 // Summarizes Collection Data and Keep Server Contents.
2 package summary
3
4 // TODO(misha): Check size of blocks as well as their digest.
5
6 import (
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9         "git.curoverse.com/arvados.git/services/datamanager/collection"
10         "git.curoverse.com/arvados.git/services/datamanager/keep"
11         "sort"
12 )
13
14 type BlockSet map[blockdigest.DigestWithSize]struct{}
15
16 // Adds a single block to the set.
17 func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
18         bs[digest] = struct{}{}
19 }
20
21 // Adds a set of blocks to the set.
22 func (bs BlockSet) Union(obs BlockSet) {
23         for k, v := range obs {
24                 bs[k] = v
25         }
26 }
27
28 // We use the collection index to save space. To convert to and from
29 // the uuid, use collection.ReadCollections' fields
30 // CollectionIndexToUuid and CollectionUuidToIndex.
31 type CollectionIndexSet map[int]struct{}
32
33 // Adds a single collection to the set. The collection is specified by
34 // its index.
35 func (cis CollectionIndexSet) Insert(collectionIndex int) {
36         cis[collectionIndex] = struct{}{}
37 }
38
39 func (bs BlockSet) ToCollectionIndexSet(
40         readCollections collection.ReadCollections,
41         collectionIndexSet *CollectionIndexSet) {
42         for block := range bs {
43                 for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
44                         collectionIndexSet.Insert(collectionIndex)
45                 }
46         }
47 }
48
49 // Keeps track of the requested and actual replication levels.
50 // Currently this is only used for blocks but could easily be used for
51 // collections as well.
52 type ReplicationLevels struct {
53         // The requested replication level.
54         // For Blocks this is the maximum replication level among all the
55         // collections this block belongs to.
56         Requested int
57
58         // The actual number of keep servers this is on.
59         Actual int
60 }
61
62 // Maps from replication levels to their blocks.
63 type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
64
65 // An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks.
66 type ReplicationLevelBlockCount struct {
67         Levels ReplicationLevels
68         Count  int
69 }
70
71 // An ordered list of ReplicationLevelBlockCount useful for reporting.
72 type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
73
74 type ReplicationSummary struct {
75         CollectionBlocksNotInKeep  BlockSet
76         UnderReplicatedBlocks      BlockSet
77         OverReplicatedBlocks       BlockSet
78         CorrectlyReplicatedBlocks  BlockSet
79         KeepBlocksNotInCollections BlockSet
80
81         CollectionsNotFullyInKeep      CollectionIndexSet
82         UnderReplicatedCollections     CollectionIndexSet
83         OverReplicatedCollections      CollectionIndexSet
84         CorrectlyReplicatedCollections CollectionIndexSet
85 }
86
87 // This struct counts the elements in each set in ReplicationSummary.
88 type ReplicationSummaryCounts struct {
89         CollectionBlocksNotInKeep      int
90         UnderReplicatedBlocks          int
91         OverReplicatedBlocks           int
92         CorrectlyReplicatedBlocks      int
93         KeepBlocksNotInCollections     int
94         CollectionsNotFullyInKeep      int
95         UnderReplicatedCollections     int
96         OverReplicatedCollections      int
97         CorrectlyReplicatedCollections int
98 }
99
100 // Gets the BlockSet for a given set of ReplicationLevels, creating it
101 // if it doesn't already exist.
102 func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
103         repLevels ReplicationLevels) (bs BlockSet) {
104         bs, exists := rlbs[repLevels]
105         if !exists {
106                 bs = make(BlockSet)
107                 rlbs[repLevels] = bs
108         }
109         return
110 }
111
112 // Adds a block to the set for a given replication level.
113 func (rlbs ReplicationLevelBlockSetMap) Insert(
114         repLevels ReplicationLevels,
115         block blockdigest.DigestWithSize) {
116         rlbs.GetOrCreate(repLevels).Insert(block)
117 }
118
119 // Adds a set of blocks to the set for a given replication level.
120 func (rlbs ReplicationLevelBlockSetMap) Union(
121         repLevels ReplicationLevels,
122         bs BlockSet) {
123         rlbs.GetOrCreate(repLevels).Union(bs)
124 }
125
126 // Outputs a sorted list of ReplicationLevelBlockCounts.
127 func (rlbs ReplicationLevelBlockSetMap) Counts() (
128         sorted ReplicationLevelBlockSetSlice) {
129         sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
130         i := 0
131         for levels, set := range rlbs {
132                 sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
133                 i++
134         }
135         sort.Sort(sorted)
136         return
137 }
138
139 // Implemented to meet sort.Interface
140 func (rlbss ReplicationLevelBlockSetSlice) Len() int {
141         return len(rlbss)
142 }
143
144 // Implemented to meet sort.Interface
145 func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
146         return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
147                 (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
148                         rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
149 }
150
151 // Implemented to meet sort.Interface
152 func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
153         rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
154 }
155
156 func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
157         // TODO(misha): Consider rewriting this method to iterate through
158         // the fields using reflection, instead of explictily listing the
159         // fields as we do now.
160         rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
161         rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
162         rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
163         rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks)
164         rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections)
165         rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep)
166         rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections)
167         rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections)
168         rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections)
169         return rsc
170 }
171
172 func (rsc ReplicationSummaryCounts) PrettyPrint() string {
173         return fmt.Sprintf("Replication Block Counts:"+
174                 "\n Missing From Keep: %d, "+
175                 "\n Under Replicated: %d, "+
176                 "\n Over Replicated: %d, "+
177                 "\n Replicated Just Right: %d, "+
178                 "\n Not In Any Collection: %d. "+
179                 "\nReplication Collection Counts:"+
180                 "\n Missing From Keep: %d, "+
181                 "\n Under Replicated: %d, "+
182                 "\n Over Replicated: %d, "+
183                 "\n Replicated Just Right: %d.",
184                 rsc.CollectionBlocksNotInKeep,
185                 rsc.UnderReplicatedBlocks,
186                 rsc.OverReplicatedBlocks,
187                 rsc.CorrectlyReplicatedBlocks,
188                 rsc.KeepBlocksNotInCollections,
189                 rsc.CollectionsNotFullyInKeep,
190                 rsc.UnderReplicatedCollections,
191                 rsc.OverReplicatedCollections,
192                 rsc.CorrectlyReplicatedCollections)
193 }
194
195 func BucketReplication(readCollections collection.ReadCollections,
196         keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) {
197         rlbsm = make(ReplicationLevelBlockSetMap)
198
199         for block, requestedReplication := range readCollections.BlockToDesiredReplication {
200                 rlbsm.Insert(
201                         ReplicationLevels{
202                                 Requested: requestedReplication,
203                                 Actual:    len(keepServerInfo.BlockToServers[block])},
204                         block)
205         }
206
207         for block, servers := range keepServerInfo.BlockToServers {
208                 if 0 == readCollections.BlockToDesiredReplication[block] {
209                         rlbsm.Insert(
210                                 ReplicationLevels{Requested: 0, Actual: len(servers)},
211                                 block)
212                 }
213         }
214         return
215 }
216
217 func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
218         readCollections collection.ReadCollections) (
219         rs ReplicationSummary) {
220         rs.CollectionBlocksNotInKeep = make(BlockSet)
221         rs.UnderReplicatedBlocks = make(BlockSet)
222         rs.OverReplicatedBlocks = make(BlockSet)
223         rs.CorrectlyReplicatedBlocks = make(BlockSet)
224         rs.KeepBlocksNotInCollections = make(BlockSet)
225
226         rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
227         rs.UnderReplicatedCollections = make(CollectionIndexSet)
228         rs.OverReplicatedCollections = make(CollectionIndexSet)
229         rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
230
231         for levels, bs := range rlbsm {
232                 if levels.Actual == 0 {
233                         rs.CollectionBlocksNotInKeep.Union(bs)
234                 } else if levels.Requested == 0 {
235                         rs.KeepBlocksNotInCollections.Union(bs)
236                 } else if levels.Actual < levels.Requested {
237                         rs.UnderReplicatedBlocks.Union(bs)
238                 } else if levels.Actual > levels.Requested {
239                         rs.OverReplicatedBlocks.Union(bs)
240                 } else {
241                         rs.CorrectlyReplicatedBlocks.Union(bs)
242                 }
243         }
244
245         rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections,
246                 &rs.CollectionsNotFullyInKeep)
247         // Since different collections can specify different replication
248         // levels, the fact that a block is under-replicated does not imply
249         // that all collections that it belongs to are under-replicated, but
250         // we'll ignore that for now.
251         // TODO(misha): Fix this and report the correct set of collections.
252         rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections,
253                 &rs.UnderReplicatedCollections)
254         rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
255                 &rs.OverReplicatedCollections)
256
257         for i := range readCollections.CollectionIndexToUuid {
258                 if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
259                 } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
260                 } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
261                 } else {
262                         rs.CorrectlyReplicatedCollections.Insert(i)
263                 }
264         }
265
266         return
267 }