1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 check "gopkg.in/check.v1"
24 var _ = check.Suite(&runSuite{})
26 type reqTracker struct {
31 func (rt *reqTracker) Count() int {
37 func (rt *reqTracker) Add(req *http.Request) int {
40 rt.reqs = append(rt.reqs, *req)
44 // stubServer is an HTTP transport that intercepts and processes all
45 // requests using its own handlers.
46 type stubServer struct {
51 logf func(string, ...interface{})
54 // Start initializes the stub server and returns an *http.Client that
55 // uses the stub server to handle all requests.
57 // A stubServer that has been started should eventually be shut down
59 func (s *stubServer) Start() *http.Client {
60 // Set up a config.Client that forwards all requests to s.mux
61 // via s.srv. Test cases will attach handlers to s.mux to get
62 // the desired responses.
63 s.mux = http.NewServeMux()
64 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
68 w.Header().Set("Content-Type", "application/json")
71 return &http.Client{Transport: s}
74 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
75 w := httptest.NewRecorder()
76 s.mux.ServeHTTP(w, req)
77 return &http.Response{
79 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
81 Body: ioutil.NopCloser(w.Body)}, nil
84 // Close releases resources used by the server.
85 func (s *stubServer) Close() {
89 func (s *stubServer) serveStatic(path, data string) *reqTracker {
91 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
94 ioutil.ReadAll(r.Body)
97 io.WriteString(w, data)
102 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
103 return s.serveStatic("/arvados/v1/users/current",
104 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
107 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
108 return s.serveStatic("/arvados/v1/users/current",
109 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
112 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
113 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
114 `{"defaultCollectionReplication":2}`)
117 func (s *stubServer) serveZeroCollections() *reqTracker {
118 return s.serveStatic("/arvados/v1/collections",
119 `{"items":[],"items_available":0}`)
122 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
124 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
127 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
128 io.WriteString(w, `{"items_available":0,"items":[]}`)
130 io.WriteString(w, `{"items_available":2,"items":[
131 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
132 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
138 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
140 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
143 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
144 io.WriteString(w, `{"items_available":3,"items":[]}`)
145 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
146 io.WriteString(w, `{"items_available":0,"items":[]}`)
147 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
148 io.WriteString(w, `{"items_available":0,"items":[]}`)
150 io.WriteString(w, `{"items_available":2,"items":[
151 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
152 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
158 func (s *stubServer) serveZeroKeepServices() *reqTracker {
159 return s.serveStatic("/arvados/v1/keep_services",
160 `{"items":[],"items_available":0}`)
163 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
164 return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
165 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
166 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
167 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
168 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
169 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
172 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
174 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
176 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
177 io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
179 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
184 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
185 return s.serveStatic("/trash", `{}`)
188 func (s *stubServer) serveKeepstorePull() *reqTracker {
189 return s.serveStatic("/pull", `{}`)
192 type runSuite struct {
197 // make a log.Logger that writes to the current test's c.Log().
198 func (s *runSuite) logger(c *check.C) *log.Logger {
201 buf := make([]byte, 10000)
203 n, err := r.Read(buf)
205 if buf[n-1] == '\n' {
208 c.Log(string(buf[:n]))
215 return log.New(w, "", log.LstdFlags)
218 func (s *runSuite) SetUpTest(c *check.C) {
220 Client: arvados.Client{
222 APIHost: "zzzzz.arvadosapi.com",
223 Client: s.stub.Start()},
224 KeepServiceTypes: []string{"disk"}}
225 s.stub.serveDiscoveryDoc()
229 func (s *runSuite) TearDownTest(c *check.C) {
233 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
239 s.stub.serveCurrentUserAdmin()
240 s.stub.serveZeroCollections()
241 s.stub.serveFourDiskKeepServices()
242 s.stub.serveKeepstoreIndexFoo4Bar1()
243 trashReqs := s.stub.serveKeepstoreTrash()
244 pullReqs := s.stub.serveKeepstorePull()
245 _, err := (&Balancer{}).Run(s.config, opts)
246 c.Check(err, check.ErrorMatches, "received zero collections")
247 c.Check(trashReqs.Count(), check.Equals, 4)
248 c.Check(pullReqs.Count(), check.Equals, 0)
251 func (s *runSuite) TestServiceTypes(c *check.C) {
257 s.config.KeepServiceTypes = []string{"unlisted-type"}
258 s.stub.serveCurrentUserAdmin()
259 s.stub.serveFooBarFileCollections()
260 s.stub.serveFourDiskKeepServices()
261 indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
262 trashReqs := s.stub.serveKeepstoreTrash()
263 _, err := (&Balancer{}).Run(s.config, opts)
264 c.Check(err, check.IsNil)
265 c.Check(indexReqs.Count(), check.Equals, 0)
266 c.Check(trashReqs.Count(), check.Equals, 0)
269 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
275 s.stub.serveCurrentUserNotAdmin()
276 s.stub.serveZeroCollections()
277 s.stub.serveFourDiskKeepServices()
278 trashReqs := s.stub.serveKeepstoreTrash()
279 pullReqs := s.stub.serveKeepstorePull()
280 _, err := (&Balancer{}).Run(s.config, opts)
281 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
282 c.Check(trashReqs.Count(), check.Equals, 0)
283 c.Check(pullReqs.Count(), check.Equals, 0)
286 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
292 s.stub.serveCurrentUserAdmin()
293 s.stub.serveCollectionsButSkipOne()
294 s.stub.serveFourDiskKeepServices()
295 s.stub.serveKeepstoreIndexFoo4Bar1()
296 trashReqs := s.stub.serveKeepstoreTrash()
297 pullReqs := s.stub.serveKeepstorePull()
298 _, err := (&Balancer{}).Run(s.config, opts)
299 c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
300 c.Check(trashReqs.Count(), check.Equals, 4)
301 c.Check(pullReqs.Count(), check.Equals, 0)
304 func (s *runSuite) TestDryRun(c *check.C) {
310 s.stub.serveCurrentUserAdmin()
311 collReqs := s.stub.serveFooBarFileCollections()
312 s.stub.serveFourDiskKeepServices()
313 s.stub.serveKeepstoreIndexFoo4Bar1()
314 trashReqs := s.stub.serveKeepstoreTrash()
315 pullReqs := s.stub.serveKeepstorePull()
317 _, err := bal.Run(s.config, opts)
318 c.Check(err, check.IsNil)
319 for _, req := range collReqs.reqs {
320 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
322 c.Check(trashReqs.Count(), check.Equals, 0)
323 c.Check(pullReqs.Count(), check.Equals, 0)
324 stats := bal.getStatistics()
325 c.Check(stats.pulls, check.Not(check.Equals), 0)
326 c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
327 c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
330 func (s *runSuite) TestCommit(c *check.C) {
337 s.stub.serveCurrentUserAdmin()
338 s.stub.serveFooBarFileCollections()
339 s.stub.serveFourDiskKeepServices()
340 s.stub.serveKeepstoreIndexFoo4Bar1()
341 trashReqs := s.stub.serveKeepstoreTrash()
342 pullReqs := s.stub.serveKeepstorePull()
344 _, err := bal.Run(s.config, opts)
345 c.Check(err, check.IsNil)
346 c.Check(trashReqs.Count(), check.Equals, 8)
347 c.Check(pullReqs.Count(), check.Equals, 4)
348 stats := bal.getStatistics()
349 // "foo" block is overreplicated by 2
350 c.Check(stats.trashes, check.Equals, 2)
351 // "bar" block is underreplicated by 1, and its only copy is
352 // in a poor rendezvous position
353 c.Check(stats.pulls, check.Equals, 2)
356 func (s *runSuite) TestRunForever(c *check.C) {
363 s.stub.serveCurrentUserAdmin()
364 s.stub.serveFooBarFileCollections()
365 s.stub.serveFourDiskKeepServices()
366 s.stub.serveKeepstoreIndexFoo4Bar1()
367 trashReqs := s.stub.serveKeepstoreTrash()
368 pullReqs := s.stub.serveKeepstorePull()
370 stop := make(chan interface{})
371 s.config.RunPeriod = arvados.Duration(time.Millisecond)
372 go RunForever(s.config, opts, stop)
374 // Each run should send 4 pull lists + 4 trash lists. The
375 // first run should also send 4 empty trash lists at
376 // startup. We should complete all four runs in much less than
378 for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
379 time.Sleep(time.Millisecond)
382 c.Check(pullReqs.Count() >= 16, check.Equals, true)
383 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)