1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/lib/config"
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
22 "github.com/sirupsen/logrus"
23 check "gopkg.in/check.v1"
26 var _ = check.Suite(&runSuite{})
28 type reqTracker struct {
33 func (rt *reqTracker) Count() int {
39 func (rt *reqTracker) Add(req *http.Request) int {
42 rt.reqs = append(rt.reqs, *req)
46 var stubServices = []arvados.KeepService{
48 UUID: "zzzzz-bi6l4-000000000000000",
49 ServiceHost: "keep0.zzzzz.arvadosapi.com",
51 ServiceSSLFlag: false,
55 UUID: "zzzzz-bi6l4-000000000000001",
56 ServiceHost: "keep1.zzzzz.arvadosapi.com",
58 ServiceSSLFlag: false,
62 UUID: "zzzzz-bi6l4-000000000000002",
63 ServiceHost: "keep2.zzzzz.arvadosapi.com",
65 ServiceSSLFlag: false,
69 UUID: "zzzzz-bi6l4-000000000000003",
70 ServiceHost: "keep3.zzzzz.arvadosapi.com",
72 ServiceSSLFlag: false,
76 UUID: "zzzzz-bi6l4-h0a0xwut9qa6g3a",
77 ServiceHost: "keep.zzzzz.arvadosapi.com",
84 var stubMounts = map[string][]arvados.KeepMount{
85 "keep0.zzzzz.arvadosapi.com:25107": {{
86 UUID: "zzzzz-ivpuk-000000000000000",
87 DeviceID: "keep0-vol0",
89 "keep1.zzzzz.arvadosapi.com:25107": {{
90 UUID: "zzzzz-ivpuk-100000000000000",
91 DeviceID: "keep1-vol0",
93 "keep2.zzzzz.arvadosapi.com:25107": {{
94 UUID: "zzzzz-ivpuk-200000000000000",
95 DeviceID: "keep2-vol0",
97 "keep3.zzzzz.arvadosapi.com:25107": {{
98 UUID: "zzzzz-ivpuk-300000000000000",
99 DeviceID: "keep3-vol0",
103 // stubServer is an HTTP transport that intercepts and processes all
104 // requests using its own handlers.
105 type stubServer struct {
110 logf func(string, ...interface{})
113 // Start initializes the stub server and returns an *http.Client that
114 // uses the stub server to handle all requests.
116 // A stubServer that has been started should eventually be shut down
118 func (s *stubServer) Start() *http.Client {
119 // Set up a config.Client that forwards all requests to s.mux
120 // via s.srv. Test cases will attach handlers to s.mux to get
121 // the desired responses.
122 s.mux = http.NewServeMux()
123 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
127 w.Header().Set("Content-Type", "application/json")
128 s.mux.ServeHTTP(w, r)
130 return &http.Client{Transport: s}
133 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
134 w := httptest.NewRecorder()
135 s.mux.ServeHTTP(w, req)
136 return &http.Response{
138 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
140 Body: ioutil.NopCloser(w.Body)}, nil
143 // Close releases resources used by the server.
144 func (s *stubServer) Close() {
148 func (s *stubServer) serveStatic(path, data string) *reqTracker {
150 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
153 ioutil.ReadAll(r.Body)
156 io.WriteString(w, data)
161 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
162 return s.serveStatic("/arvados/v1/users/current",
163 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
166 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
167 return s.serveStatic("/arvados/v1/users/current",
168 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
171 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
172 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
173 `{"defaultCollectionReplication":2}`)
176 func (s *stubServer) serveZeroCollections() *reqTracker {
177 return s.serveStatic("/arvados/v1/collections",
178 `{"items":[],"items_available":0}`)
181 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
183 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
186 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
187 io.WriteString(w, `{"items_available":0,"items":[]}`)
189 io.WriteString(w, `{"items_available":3,"items":[
190 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
191 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
192 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
198 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
200 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
203 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
204 io.WriteString(w, `{"items_available":3,"items":[]}`)
205 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
206 io.WriteString(w, `{"items_available":0,"items":[]}`)
207 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
208 io.WriteString(w, `{"items_available":0,"items":[]}`)
209 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
210 io.WriteString(w, `{"items_available":0,"items":[]}`)
212 io.WriteString(w, `{"items_available":2,"items":[
213 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
214 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
220 func (s *stubServer) serveZeroKeepServices() *reqTracker {
221 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
224 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
225 return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
226 ItemsAvailable: len(svcs),
231 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
233 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
235 json.NewEncoder(w).Encode(resp)
240 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
242 s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
244 json.NewEncoder(w).Encode(stubMounts[r.Host])
249 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
251 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
253 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
254 io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
256 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
258 for _, mounts := range stubMounts {
259 for i, mnt := range mounts {
261 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
263 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
264 io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
267 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
276 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
278 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
280 io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
282 for _, mounts := range stubMounts {
283 for i, mnt := range mounts {
285 s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
288 io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
290 io.WriteString(w, "\n")
298 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
299 return s.serveStatic("/trash", `{}`)
302 func (s *stubServer) serveKeepstorePull() *reqTracker {
303 return s.serveStatic("/pull", `{}`)
306 type runSuite struct {
308 config *arvados.Cluster
309 client *arvados.Client
312 func (s *runSuite) newServer(options *RunOptions) *Server {
316 RunOptions: *options,
317 Metrics: newMetrics(),
318 Logger: options.Logger,
319 Dumper: options.Dumper,
325 // make a log.Logger that writes to the current test's c.Log().
326 func (s *runSuite) logger(c *check.C) *logrus.Logger {
329 buf := make([]byte, 10000)
331 n, err := r.Read(buf)
333 if buf[n-1] == '\n' {
336 c.Log(string(buf[:n]))
343 logger := logrus.New()
348 func (s *runSuite) SetUpTest(c *check.C) {
349 cfg, err := config.NewLoader(nil, nil).Load()
350 c.Assert(err, check.Equals, nil)
351 s.config, err = cfg.GetCluster("")
352 c.Assert(err, check.Equals, nil)
354 s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
355 arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
357 s.client = &arvados.Client{
359 APIHost: "zzzzz.arvadosapi.com",
360 Client: s.stub.Start()}
362 s.stub.serveDiscoveryDoc()
366 func (s *runSuite) TearDownTest(c *check.C) {
370 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
376 s.stub.serveCurrentUserAdmin()
377 s.stub.serveZeroCollections()
378 s.stub.serveKeepServices(stubServices)
379 s.stub.serveKeepstoreMounts()
380 s.stub.serveKeepstoreIndexFoo4Bar1()
381 trashReqs := s.stub.serveKeepstoreTrash()
382 pullReqs := s.stub.serveKeepstorePull()
383 srv := s.newServer(&opts)
384 _, err := srv.runOnce()
385 c.Check(err, check.ErrorMatches, "received zero collections")
386 c.Check(trashReqs.Count(), check.Equals, 4)
387 c.Check(pullReqs.Count(), check.Equals, 0)
390 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
396 s.stub.serveCurrentUserNotAdmin()
397 s.stub.serveZeroCollections()
398 s.stub.serveKeepServices(stubServices)
399 s.stub.serveKeepstoreMounts()
400 trashReqs := s.stub.serveKeepstoreTrash()
401 pullReqs := s.stub.serveKeepstorePull()
402 srv := s.newServer(&opts)
403 _, err := srv.runOnce()
404 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
405 c.Check(trashReqs.Count(), check.Equals, 0)
406 c.Check(pullReqs.Count(), check.Equals, 0)
409 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
415 s.stub.serveCurrentUserAdmin()
416 s.stub.serveCollectionsButSkipOne()
417 s.stub.serveKeepServices(stubServices)
418 s.stub.serveKeepstoreMounts()
419 s.stub.serveKeepstoreIndexFoo4Bar1()
420 trashReqs := s.stub.serveKeepstoreTrash()
421 pullReqs := s.stub.serveKeepstorePull()
422 srv := s.newServer(&opts)
423 _, err := srv.runOnce()
424 c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
425 c.Check(trashReqs.Count(), check.Equals, 4)
426 c.Check(pullReqs.Count(), check.Equals, 0)
429 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
430 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
431 c.Assert(err, check.IsNil)
432 s.config.Collections.BlobMissingReport = lostf.Name()
433 defer os.Remove(lostf.Name())
439 s.stub.serveCurrentUserAdmin()
440 s.stub.serveFooBarFileCollections()
441 s.stub.serveKeepServices(stubServices)
442 s.stub.serveKeepstoreMounts()
443 s.stub.serveKeepstoreIndexFoo1()
444 s.stub.serveKeepstoreTrash()
445 s.stub.serveKeepstorePull()
446 srv := s.newServer(&opts)
447 c.Assert(err, check.IsNil)
448 _, err = srv.runOnce()
449 c.Check(err, check.IsNil)
450 lost, err := ioutil.ReadFile(lostf.Name())
451 c.Assert(err, check.IsNil)
452 c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
455 func (s *runSuite) TestDryRun(c *check.C) {
461 s.stub.serveCurrentUserAdmin()
462 collReqs := s.stub.serveFooBarFileCollections()
463 s.stub.serveKeepServices(stubServices)
464 s.stub.serveKeepstoreMounts()
465 s.stub.serveKeepstoreIndexFoo4Bar1()
466 trashReqs := s.stub.serveKeepstoreTrash()
467 pullReqs := s.stub.serveKeepstorePull()
468 srv := s.newServer(&opts)
469 bal, err := srv.runOnce()
470 c.Check(err, check.IsNil)
471 for _, req := range collReqs.reqs {
472 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
473 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
475 c.Check(trashReqs.Count(), check.Equals, 0)
476 c.Check(pullReqs.Count(), check.Equals, 0)
477 c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
478 c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
479 c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
482 func (s *runSuite) TestCommit(c *check.C) {
483 lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
484 c.Assert(err, check.IsNil)
485 s.config.Collections.BlobMissingReport = lostf.Name()
486 defer os.Remove(lostf.Name())
488 s.config.ManagementToken = "xyzzy"
495 s.stub.serveCurrentUserAdmin()
496 s.stub.serveFooBarFileCollections()
497 s.stub.serveKeepServices(stubServices)
498 s.stub.serveKeepstoreMounts()
499 s.stub.serveKeepstoreIndexFoo4Bar1()
500 trashReqs := s.stub.serveKeepstoreTrash()
501 pullReqs := s.stub.serveKeepstorePull()
502 srv := s.newServer(&opts)
503 bal, err := srv.runOnce()
504 c.Check(err, check.IsNil)
505 c.Check(trashReqs.Count(), check.Equals, 8)
506 c.Check(pullReqs.Count(), check.Equals, 4)
507 // "foo" block is overreplicated by 2
508 c.Check(bal.stats.trashes, check.Equals, 2)
509 // "bar" block is underreplicated by 1, and its only copy is
510 // in a poor rendezvous position
511 c.Check(bal.stats.pulls, check.Equals, 2)
513 lost, err := ioutil.ReadFile(lostf.Name())
514 c.Assert(err, check.IsNil)
515 c.Check(string(lost), check.Equals, "")
517 metrics := s.getMetrics(c, srv)
518 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
519 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
520 c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
521 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
522 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
525 func (s *runSuite) TestRunForever(c *check.C) {
526 s.config.ManagementToken = "xyzzy"
533 s.stub.serveCurrentUserAdmin()
534 s.stub.serveFooBarFileCollections()
535 s.stub.serveKeepServices(stubServices)
536 s.stub.serveKeepstoreMounts()
537 s.stub.serveKeepstoreIndexFoo4Bar1()
538 trashReqs := s.stub.serveKeepstoreTrash()
539 pullReqs := s.stub.serveKeepstorePull()
541 stop := make(chan interface{})
542 s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
543 srv := s.newServer(&opts)
545 done := make(chan bool)
551 // Each run should send 4 pull lists + 4 trash lists. The
552 // first run should also send 4 empty trash lists at
553 // startup. We should complete all four runs in much less than
555 for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
556 time.Sleep(time.Millisecond)
560 c.Check(pullReqs.Count() >= 16, check.Equals, true)
561 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
562 c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
565 func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
566 req := httptest.NewRequest("GET", "/metrics", nil)
567 resp := httptest.NewRecorder()
568 srv.ServeHTTP(resp, req)
569 c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
571 req = httptest.NewRequest("GET", "/metrics?api_token=xyzzy", nil)
572 resp = httptest.NewRecorder()
573 srv.ServeHTTP(resp, req)
574 c.Check(resp.Code, check.Equals, http.StatusOK)
576 buf, err := ioutil.ReadAll(resp.Body)
577 c.Check(err, check.IsNil)