1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 check "gopkg.in/check.v1"
23 var _ = check.Suite(&runSuite{})
25 type reqTracker struct {
30 func (rt *reqTracker) Count() int {
36 func (rt *reqTracker) Add(req *http.Request) int {
39 rt.reqs = append(rt.reqs, *req)
43 // stubServer is an HTTP transport that intercepts and processes all
44 // requests using its own handlers.
45 type stubServer struct {
50 logf func(string, ...interface{})
53 // Start initializes the stub server and returns an *http.Client that
54 // uses the stub server to handle all requests.
56 // A stubServer that has been started should eventually be shut down
58 func (s *stubServer) Start() *http.Client {
59 // Set up a config.Client that forwards all requests to s.mux
60 // via s.srv. Test cases will attach handlers to s.mux to get
61 // the desired responses.
62 s.mux = http.NewServeMux()
63 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
67 w.Header().Set("Content-Type", "application/json")
70 return &http.Client{Transport: s}
73 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
74 w := httptest.NewRecorder()
75 s.mux.ServeHTTP(w, req)
76 return &http.Response{
78 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
80 Body: ioutil.NopCloser(w.Body)}, nil
83 // Close releases resources used by the server.
84 func (s *stubServer) Close() {
88 func (s *stubServer) serveStatic(path, data string) *reqTracker {
90 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
93 ioutil.ReadAll(r.Body)
96 io.WriteString(w, data)
101 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
102 return s.serveStatic("/arvados/v1/users/current",
103 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
106 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
107 return s.serveStatic("/arvados/v1/users/current",
108 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
111 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
112 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
113 `{"defaultCollectionReplication":2}`)
116 func (s *stubServer) serveZeroCollections() *reqTracker {
117 return s.serveStatic("/arvados/v1/collections",
118 `{"items":[],"items_available":0}`)
121 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
123 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
126 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
127 io.WriteString(w, `{"items_available":0,"items":[]}`)
129 io.WriteString(w, `{"items_available":2,"items":[
130 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
131 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
137 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
139 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
142 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
143 io.WriteString(w, `{"items_available":3,"items":[]}`)
144 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
145 io.WriteString(w, `{"items_available":0,"items":[]}`)
146 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
147 io.WriteString(w, `{"items_available":0,"items":[]}`)
149 io.WriteString(w, `{"items_available":2,"items":[
150 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
151 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
157 func (s *stubServer) serveZeroKeepServices() *reqTracker {
158 return s.serveStatic("/arvados/v1/keep_services",
159 `{"items":[],"items_available":0}`)
162 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
163 return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
164 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
165 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
166 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
167 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
168 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
171 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
173 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
175 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
176 io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
178 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
183 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
184 return s.serveStatic("/trash", `{}`)
187 func (s *stubServer) serveKeepstorePull() *reqTracker {
188 return s.serveStatic("/pull", `{}`)
191 type runSuite struct {
196 // make a log.Logger that writes to the current test's c.Log().
197 func (s *runSuite) logger(c *check.C) *log.Logger {
200 buf := make([]byte, 10000)
202 n, err := r.Read(buf)
204 if buf[n-1] == '\n' {
207 c.Log(string(buf[:n]))
214 return log.New(w, "", log.LstdFlags)
217 func (s *runSuite) SetUpTest(c *check.C) {
219 Client: arvados.Client{
221 APIHost: "zzzzz.arvadosapi.com",
222 Client: s.stub.Start()},
223 KeepServiceTypes: []string{"disk"}}
224 s.stub.serveDiscoveryDoc()
228 func (s *runSuite) TearDownTest(c *check.C) {
232 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
238 s.stub.serveCurrentUserAdmin()
239 s.stub.serveZeroCollections()
240 s.stub.serveFourDiskKeepServices()
241 s.stub.serveKeepstoreIndexFoo4Bar1()
242 trashReqs := s.stub.serveKeepstoreTrash()
243 pullReqs := s.stub.serveKeepstorePull()
244 _, err := (&Balancer{}).Run(s.config, opts)
245 c.Check(err, check.ErrorMatches, "received zero collections")
246 c.Check(trashReqs.Count(), check.Equals, 4)
247 c.Check(pullReqs.Count(), check.Equals, 0)
250 func (s *runSuite) TestServiceTypes(c *check.C) {
256 s.config.KeepServiceTypes = []string{"unlisted-type"}
257 s.stub.serveCurrentUserAdmin()
258 s.stub.serveFooBarFileCollections()
259 s.stub.serveFourDiskKeepServices()
260 indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
261 trashReqs := s.stub.serveKeepstoreTrash()
262 _, err := (&Balancer{}).Run(s.config, opts)
263 c.Check(err, check.IsNil)
264 c.Check(indexReqs.Count(), check.Equals, 0)
265 c.Check(trashReqs.Count(), check.Equals, 0)
268 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
274 s.stub.serveCurrentUserNotAdmin()
275 s.stub.serveZeroCollections()
276 s.stub.serveFourDiskKeepServices()
277 trashReqs := s.stub.serveKeepstoreTrash()
278 pullReqs := s.stub.serveKeepstorePull()
279 _, err := (&Balancer{}).Run(s.config, opts)
280 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
281 c.Check(trashReqs.Count(), check.Equals, 0)
282 c.Check(pullReqs.Count(), check.Equals, 0)
285 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
291 s.stub.serveCurrentUserAdmin()
292 s.stub.serveCollectionsButSkipOne()
293 s.stub.serveFourDiskKeepServices()
294 s.stub.serveKeepstoreIndexFoo4Bar1()
295 trashReqs := s.stub.serveKeepstoreTrash()
296 pullReqs := s.stub.serveKeepstorePull()
297 _, err := (&Balancer{}).Run(s.config, opts)
298 c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
299 c.Check(trashReqs.Count(), check.Equals, 4)
300 c.Check(pullReqs.Count(), check.Equals, 0)
303 func (s *runSuite) TestDryRun(c *check.C) {
309 s.stub.serveCurrentUserAdmin()
310 collReqs := s.stub.serveFooBarFileCollections()
311 s.stub.serveFourDiskKeepServices()
312 s.stub.serveKeepstoreIndexFoo4Bar1()
313 trashReqs := s.stub.serveKeepstoreTrash()
314 pullReqs := s.stub.serveKeepstorePull()
316 _, err := bal.Run(s.config, opts)
317 c.Check(err, check.IsNil)
318 for _, req := range collReqs.reqs {
319 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
321 c.Check(trashReqs.Count(), check.Equals, 0)
322 c.Check(pullReqs.Count(), check.Equals, 0)
323 stats := bal.getStatistics()
324 c.Check(stats.pulls, check.Not(check.Equals), 0)
325 c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
326 c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
329 func (s *runSuite) TestCommit(c *check.C) {
336 s.stub.serveCurrentUserAdmin()
337 s.stub.serveFooBarFileCollections()
338 s.stub.serveFourDiskKeepServices()
339 s.stub.serveKeepstoreIndexFoo4Bar1()
340 trashReqs := s.stub.serveKeepstoreTrash()
341 pullReqs := s.stub.serveKeepstorePull()
343 _, err := bal.Run(s.config, opts)
344 c.Check(err, check.IsNil)
345 c.Check(trashReqs.Count(), check.Equals, 8)
346 c.Check(pullReqs.Count(), check.Equals, 4)
347 stats := bal.getStatistics()
348 // "foo" block is overreplicated by 2
349 c.Check(stats.trashes, check.Equals, 2)
350 // "bar" block is underreplicated by 1, and its only copy is
351 // in a poor rendezvous position
352 c.Check(stats.pulls, check.Equals, 2)
355 func (s *runSuite) TestRunForever(c *check.C) {
362 s.stub.serveCurrentUserAdmin()
363 s.stub.serveFooBarFileCollections()
364 s.stub.serveFourDiskKeepServices()
365 s.stub.serveKeepstoreIndexFoo4Bar1()
366 trashReqs := s.stub.serveKeepstoreTrash()
367 pullReqs := s.stub.serveKeepstorePull()
369 stop := make(chan interface{})
370 s.config.RunPeriod = arvados.Duration(time.Millisecond)
371 go RunForever(s.config, opts, stop)
373 // Each run should send 4 pull lists + 4 trash lists. The
374 // first run should also send 4 empty trash lists at
375 // startup. We should complete all four runs in much less than
377 for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
378 time.Sleep(time.Millisecond)
381 c.Check(pullReqs.Count() >= 16, check.Equals, true)
382 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)