1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 check "gopkg.in/check.v1"
21 func Test(t *testing.T) {
25 var _ = check.Suite(&balancerSuite{})
27 type balancerSuite struct {
30 blks map[string]tester
31 knownRendezvous [][]int
36 // index into knownRendezvous
51 func (bal *balancerSuite) SetUpSuite(c *check.C) {
52 bal.knownRendezvous = nil
53 for _, str := range []string{
60 for _, c := range []byte(str) {
61 pos, _ := strconv.ParseUint(string(c), 16, 4)
62 slots = append(slots, int(pos))
64 bal.knownRendezvous = append(bal.knownRendezvous, slots)
67 bal.signatureTTL = 3600
70 func (bal *balancerSuite) SetUpTest(c *check.C) {
71 bal.srvs = make([]*KeepService, 16)
72 bal.KeepServices = make(map[string]*KeepService)
73 for i := range bal.srvs {
75 KeepService: arvados.KeepService{
76 UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
79 srv.mounts = []*KeepMount{{KeepMount: arvados.KeepMount{UUID: fmt.Sprintf("mount-%015x", i)}, KeepService: srv}}
81 bal.KeepServices[srv.UUID] = srv
84 bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
87 func (bal *balancerSuite) TestPerfect(c *check.C) {
95 func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
98 current: slots{0, 2, 1},
99 shouldTrash: slots{2}})
102 func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
105 current: slots{0, 1, 3},
106 shouldTrash: slots{0, 1, 3}})
109 func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
112 current: slots{0, 1},
113 shouldPull: slots{2, 3}})
116 func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
117 bal.srvList(0, slots{3})[0].ReadOnly = true
120 current: slots{0, 1},
121 shouldPull: slots{2, 4}})
124 func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
127 current: slots{2, 0},
128 shouldPull: slots{1}})
131 current: slots{2, 7},
132 shouldPull: slots{0, 1}})
133 // if only one of the pulls succeeds, we'll see this next:
136 current: slots{2, 1, 7},
137 shouldPull: slots{0},
138 shouldTrash: slots{7}})
139 // if both pulls succeed, we'll see this next:
142 current: slots{2, 0, 1, 7},
143 shouldTrash: slots{2, 7}})
145 // unbalanced + excessive replication => pull + trash
148 current: slots{2, 5, 7},
149 shouldPull: slots{0, 1},
150 shouldTrash: slots{7}})
153 func (bal *balancerSuite) TestMultipleReplicasPerService(c *check.C) {
156 current: slots{0, 0},
157 shouldPull: slots{1}})
160 current: slots{2, 2},
161 shouldPull: slots{0, 1}})
164 current: slots{0, 0, 1},
165 shouldTrash: slots{0}})
168 current: slots{1, 1, 0},
169 shouldTrash: slots{1}})
172 current: slots{1, 0, 1, 0, 2},
173 shouldTrash: slots{0, 1, 2}})
176 current: slots{1, 1, 1, 0, 2},
177 shouldTrash: slots{1, 1, 2}})
180 current: slots{1, 1, 2},
181 shouldPull: slots{0},
182 shouldTrash: slots{1}})
185 current: slots{1, 1, 0},
186 timestamps: []int64{12345678, 12345678, 12345679},
190 current: slots{1, 1},
191 shouldPull: slots{0}})
194 func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
195 // For purposes of increasing replication, we assume identical
196 // replicas are distinct.
199 current: slots{0, 1},
200 timestamps: []int64{12345678, 12345678},
201 shouldPull: slots{2, 3}})
204 func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
205 // For purposes of decreasing replication, we assume identical
206 // replicas are NOT distinct.
209 current: slots{0, 1, 2},
210 timestamps: []int64{12345678, 12345678, 12345678}})
213 current: slots{0, 1, 2},
214 timestamps: []int64{12345678, 10000000, 10000000}})
217 func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
218 oldTime := bal.MinMtime - 3600
219 newTime := bal.MinMtime + 3600
220 // The excess replica is too new to delete.
223 current: slots{0, 1, 2},
224 timestamps: []int64{oldTime, newTime, newTime + 1}})
225 // The best replicas are too new to delete, but the excess
226 // replica is old enough.
229 current: slots{0, 1, 2},
230 timestamps: []int64{newTime, newTime + 1, oldTime},
231 shouldTrash: slots{2}})
234 // Clear all servers' changesets, balance a single block, and verify
235 // the appropriate changes for that block have been added to the
237 func (bal *balancerSuite) try(c *check.C, t tester) {
238 bal.setupServiceRoots()
241 Replicas: bal.replList(t.known, t.current)}
242 for i, t := range t.timestamps {
243 blk.Replicas[i].Mtime = t
245 for _, srv := range bal.srvs {
246 srv.ChangeSet = &ChangeSet{}
248 bal.balanceBlock(knownBlkid(t.known), blk)
250 var didPull, didTrash slots
251 for i, srv := range bal.srvs {
253 for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
258 for _, pull := range srv.Pulls {
259 didPull = append(didPull, slot)
260 c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
262 for _, trash := range srv.Trashes {
263 didTrash = append(didTrash, slot)
264 c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
268 for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
269 sort.Sort(sort.IntSlice(list))
271 c.Check(didPull, check.DeepEquals, t.shouldPull)
272 c.Check(didTrash, check.DeepEquals, t.shouldTrash)
275 // srvList returns the KeepServices, sorted in rendezvous order and
276 // then selected by idx. For example, srvList(3, slots{0, 1, 4})
277 // returns the the first-, second-, and fifth-best servers for storing
278 // bal.knownBlkid(3).
279 func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
280 for _, i := range order {
281 srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
286 // replList is like srvList but returns an "existing replicas" slice,
287 // suitable for a BlockState test fixture.
288 func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
289 mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
290 for _, srv := range bal.srvList(knownBlockID, order) {
291 repls = append(repls, Replica{srv.mounts[0], mtime})
297 // generate the same data hashes that are tested in
298 // sdk/go/keepclient/root_sorter_test.go
299 func knownBlkid(i int) arvados.SizedDigest {
300 return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))