1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 check "gopkg.in/check.v1"
21 func Test(t *testing.T) {
25 var _ = check.Suite(&balancerSuite{})
27 type balancerSuite struct {
30 blks map[string]tester
31 knownRendezvous [][]int
36 // index into knownRendezvous
51 func (bal *balancerSuite) SetUpSuite(c *check.C) {
52 bal.knownRendezvous = nil
53 for _, str := range []string{
60 for _, c := range []byte(str) {
61 pos, _ := strconv.ParseUint(string(c), 16, 4)
62 slots = append(slots, int(pos))
64 bal.knownRendezvous = append(bal.knownRendezvous, slots)
67 bal.signatureTTL = 3600
70 func (bal *balancerSuite) SetUpTest(c *check.C) {
71 bal.srvs = make([]*KeepService, 16)
72 bal.KeepServices = make(map[string]*KeepService)
73 for i := range bal.srvs {
75 KeepService: arvados.KeepService{
76 UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
80 bal.KeepServices[srv.UUID] = srv
83 bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
86 func (bal *balancerSuite) TestPerfect(c *check.C) {
94 func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
97 current: slots{0, 2, 1},
98 shouldTrash: slots{2}})
101 func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
104 current: slots{0, 1, 3},
105 shouldTrash: slots{0, 1, 3}})
108 func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
111 current: slots{0, 1},
112 shouldPull: slots{2, 3}})
115 func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
116 bal.srvList(0, slots{3})[0].ReadOnly = true
119 current: slots{0, 1},
120 shouldPull: slots{2, 4}})
123 func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
126 current: slots{2, 0},
127 shouldPull: slots{1}})
130 current: slots{2, 7},
131 shouldPull: slots{0, 1}})
132 // if only one of the pulls succeeds, we'll see this next:
135 current: slots{2, 1, 7},
136 shouldPull: slots{0},
137 shouldTrash: slots{7}})
138 // if both pulls succeed, we'll see this next:
141 current: slots{2, 0, 1, 7},
142 shouldTrash: slots{2, 7}})
144 // unbalanced + excessive replication => pull + trash
147 current: slots{2, 5, 7},
148 shouldPull: slots{0, 1},
149 shouldTrash: slots{7}})
152 func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
153 // For purposes of increasing replication, we assume identical
154 // replicas are distinct.
157 current: slots{0, 1},
158 timestamps: []int64{12345678, 12345678},
159 shouldPull: slots{2, 3}})
162 func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
163 // For purposes of decreasing replication, we assume identical
164 // replicas are NOT distinct.
167 current: slots{0, 1, 2},
168 timestamps: []int64{12345678, 12345678, 12345678}})
171 current: slots{0, 1, 2},
172 timestamps: []int64{12345678, 10000000, 10000000}})
175 func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
176 oldTime := bal.MinMtime - 3600
177 newTime := bal.MinMtime + 3600
178 // The excess replica is too new to delete.
181 current: slots{0, 1, 2},
182 timestamps: []int64{oldTime, newTime, newTime + 1}})
183 // The best replicas are too new to delete, but the excess
184 // replica is old enough.
187 current: slots{0, 1, 2},
188 timestamps: []int64{newTime, newTime + 1, oldTime},
189 shouldTrash: slots{2}})
192 // Clear all servers' changesets, balance a single block, and verify
193 // the appropriate changes for that block have been added to the
195 func (bal *balancerSuite) try(c *check.C, t tester) {
196 bal.setupServiceRoots()
199 Replicas: bal.replList(t.known, t.current)}
200 for i, t := range t.timestamps {
201 blk.Replicas[i].Mtime = t
203 for _, srv := range bal.srvs {
204 srv.ChangeSet = &ChangeSet{}
206 bal.balanceBlock(knownBlkid(t.known), blk)
208 var didPull, didTrash slots
209 for i, srv := range bal.srvs {
211 for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
216 for _, pull := range srv.Pulls {
217 didPull = append(didPull, slot)
218 c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
220 for _, trash := range srv.Trashes {
221 didTrash = append(didTrash, slot)
222 c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
226 for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
227 sort.Sort(sort.IntSlice(list))
229 c.Check(didPull, check.DeepEquals, t.shouldPull)
230 c.Check(didTrash, check.DeepEquals, t.shouldTrash)
233 // srvList returns the KeepServices, sorted in rendezvous order and
234 // then selected by idx. For example, srvList(3, 0, 1, 4) returns the
235 // the first-, second-, and fifth-best servers for storing
236 // bal.knownBlkid(3).
237 func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
238 for _, i := range order {
239 srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
244 // replList is like srvList but returns an "existing replicas" slice,
245 // suitable for a BlockState test fixture.
246 func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
247 mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
248 for _, srv := range bal.srvList(knownBlockID, order) {
249 repls = append(repls, Replica{srv, mtime})
255 // generate the same data hashes that are tested in
256 // sdk/go/keepclient/root_sorter_test.go
257 func knownBlkid(i int) arvados.SizedDigest {
258 return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))