11 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 check "gopkg.in/check.v1"
17 func Test(t *testing.T) {
21 var _ = check.Suite(&balancerSuite{})
23 type balancerSuite struct {
26 blks map[string]tester
27 knownRendezvous [][]int
32 // index into knownRendezvous
47 func (bal *balancerSuite) SetUpSuite(c *check.C) {
48 bal.knownRendezvous = nil
49 for _, str := range []string{
56 for _, c := range []byte(str) {
57 pos, _ := strconv.ParseUint(string(c), 16, 4)
58 slots = append(slots, int(pos))
60 bal.knownRendezvous = append(bal.knownRendezvous, slots)
63 bal.signatureTTL = 3600
66 func (bal *balancerSuite) SetUpTest(c *check.C) {
67 bal.srvs = make([]*KeepService, 16)
68 bal.KeepServices = make(map[string]*KeepService)
69 for i := range bal.srvs {
71 KeepService: arvados.KeepService{
72 UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
76 bal.KeepServices[srv.UUID] = srv
79 bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
82 func (bal *balancerSuite) TestPerfect(c *check.C) {
90 func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
93 current: slots{0, 2, 1},
94 shouldTrash: slots{2}})
97 func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
100 current: slots{0, 1, 3},
101 shouldTrash: slots{0, 1, 3}})
104 func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
107 current: slots{0, 1},
108 shouldPull: slots{2, 3}})
111 func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
112 bal.srvList(0, slots{3})[0].ReadOnly = true
115 current: slots{0, 1},
116 shouldPull: slots{2, 4}})
119 func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
122 current: slots{2, 0},
123 shouldPull: slots{1}})
126 current: slots{2, 7},
127 shouldPull: slots{0, 1}})
128 // if only one of the pulls succeeds, we'll see this next:
131 current: slots{2, 1, 7},
132 shouldPull: slots{0},
133 shouldTrash: slots{7}})
134 // if both pulls succeed, we'll see this next:
137 current: slots{2, 0, 1, 7},
138 shouldTrash: slots{2, 7}})
140 // unbalanced + excessive replication => pull + trash
143 current: slots{2, 5, 7},
144 shouldPull: slots{0, 1},
145 shouldTrash: slots{7}})
148 func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
149 // For purposes of increasing replication, we assume identical
150 // replicas are distinct.
153 current: slots{0, 1},
154 timestamps: []int64{12345678, 12345678},
155 shouldPull: slots{2, 3}})
158 func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
159 // For purposes of decreasing replication, we assume identical
160 // replicas are NOT distinct.
163 current: slots{0, 1, 2},
164 timestamps: []int64{12345678, 12345678, 12345678}})
167 current: slots{0, 1, 2},
168 timestamps: []int64{12345678, 10000000, 10000000}})
171 func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
172 oldTime := bal.MinMtime - 3600
173 newTime := bal.MinMtime + 3600
174 // The excess replica is too new to delete.
177 current: slots{0, 1, 2},
178 timestamps: []int64{oldTime, newTime, newTime + 1}})
179 // The best replicas are too new to delete, but the excess
180 // replica is old enough.
183 current: slots{0, 1, 2},
184 timestamps: []int64{newTime, newTime + 1, oldTime},
185 shouldTrash: slots{2}})
188 // Clear all servers' changesets, balance a single block, and verify
189 // the appropriate changes for that block have been added to the
191 func (bal *balancerSuite) try(c *check.C, t tester) {
192 bal.setupServiceRoots()
195 Replicas: bal.replList(t.known, t.current)}
196 for i, t := range t.timestamps {
197 blk.Replicas[i].Mtime = t
199 for _, srv := range bal.srvs {
200 srv.ChangeSet = &ChangeSet{}
202 bal.balanceBlock(knownBlkid(t.known), blk)
204 var didPull, didTrash slots
205 for i, srv := range bal.srvs {
207 for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
212 for _, pull := range srv.Pulls {
213 didPull = append(didPull, slot)
214 c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
216 for _, trash := range srv.Trashes {
217 didTrash = append(didTrash, slot)
218 c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
222 for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
223 sort.Sort(sort.IntSlice(list))
225 c.Check(didPull, check.DeepEquals, t.shouldPull)
226 c.Check(didTrash, check.DeepEquals, t.shouldTrash)
229 // srvList returns the KeepServices, sorted in rendezvous order and
230 // then selected by idx. For example, srvList(3, 0, 1, 4) returns the
231 // the first-, second-, and fifth-best servers for storing
232 // bal.knownBlkid(3).
233 func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
234 for _, i := range order {
235 srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
240 // replList is like srvList but returns an "existing replicas" slice,
241 // suitable for a BlockState test fixture.
242 func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
243 mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
244 for _, srv := range bal.srvList(knownBlockID, order) {
245 repls = append(repls, Replica{srv, mtime})
251 // generate the same data hashes that are tested in
252 // sdk/go/keepclient/root_sorter_test.go
253 func knownBlkid(i int) arvados.SizedDigest {
254 return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))