19923: keep-balance option to process a subset of blocks.
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/jmoiron/sqlx"
26         "github.com/prometheus/client_golang/prometheus"
27         "github.com/prometheus/common/expfmt"
28         check "gopkg.in/check.v1"
29 )
30
31 var _ = check.Suite(&runSuite{})
32
33 type reqTracker struct {
34         reqs []http.Request
35         sync.Mutex
36 }
37
38 func (rt *reqTracker) Count() int {
39         rt.Lock()
40         defer rt.Unlock()
41         return len(rt.reqs)
42 }
43
44 func (rt *reqTracker) Add(req *http.Request) int {
45         rt.Lock()
46         defer rt.Unlock()
47         rt.reqs = append(rt.reqs, *req)
48         return len(rt.reqs)
49 }
50
51 var stubServices = []arvados.KeepService{
52         {
53                 UUID:           "zzzzz-bi6l4-000000000000000",
54                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
55                 ServicePort:    25107,
56                 ServiceSSLFlag: false,
57                 ServiceType:    "disk",
58         },
59         {
60                 UUID:           "zzzzz-bi6l4-000000000000001",
61                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
62                 ServicePort:    25107,
63                 ServiceSSLFlag: false,
64                 ServiceType:    "disk",
65         },
66         {
67                 UUID:           "zzzzz-bi6l4-000000000000002",
68                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
69                 ServicePort:    25107,
70                 ServiceSSLFlag: false,
71                 ServiceType:    "disk",
72         },
73         {
74                 UUID:           "zzzzz-bi6l4-000000000000003",
75                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
76                 ServicePort:    25107,
77                 ServiceSSLFlag: false,
78                 ServiceType:    "disk",
79         },
80         {
81                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
82                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
83                 ServicePort:    25333,
84                 ServiceSSLFlag: true,
85                 ServiceType:    "proxy",
86         },
87 }
88
89 var stubMounts = map[string][]arvados.KeepMount{
90         "keep0.zzzzz.arvadosapi.com:25107": {{
91                 UUID:           "zzzzz-ivpuk-000000000000000",
92                 DeviceID:       "keep0-vol0",
93                 StorageClasses: map[string]bool{"default": true},
94         }},
95         "keep1.zzzzz.arvadosapi.com:25107": {{
96                 UUID:           "zzzzz-ivpuk-100000000000000",
97                 DeviceID:       "keep1-vol0",
98                 StorageClasses: map[string]bool{"default": true},
99         }},
100         "keep2.zzzzz.arvadosapi.com:25107": {{
101                 UUID:           "zzzzz-ivpuk-200000000000000",
102                 DeviceID:       "keep2-vol0",
103                 StorageClasses: map[string]bool{"default": true},
104         }},
105         "keep3.zzzzz.arvadosapi.com:25107": {{
106                 UUID:           "zzzzz-ivpuk-300000000000000",
107                 DeviceID:       "keep3-vol0",
108                 StorageClasses: map[string]bool{"default": true},
109         }},
110 }
111
112 // stubServer is an HTTP transport that intercepts and processes all
113 // requests using its own handlers.
114 type stubServer struct {
115         mux      *http.ServeMux
116         srv      *httptest.Server
117         mutex    sync.Mutex
118         Requests reqTracker
119         logf     func(string, ...interface{})
120 }
121
122 // Start initializes the stub server and returns an *http.Client that
123 // uses the stub server to handle all requests.
124 //
125 // A stubServer that has been started should eventually be shut down
126 // with Close().
127 func (s *stubServer) Start() *http.Client {
128         // Set up a config.Client that forwards all requests to s.mux
129         // via s.srv. Test cases will attach handlers to s.mux to get
130         // the desired responses.
131         s.mux = http.NewServeMux()
132         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
133                 s.mutex.Lock()
134                 s.Requests.Add(r)
135                 s.mutex.Unlock()
136                 w.Header().Set("Content-Type", "application/json")
137                 s.mux.ServeHTTP(w, r)
138         }))
139         return &http.Client{Transport: s}
140 }
141
142 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
143         w := httptest.NewRecorder()
144         s.mux.ServeHTTP(w, req)
145         return &http.Response{
146                 StatusCode: w.Code,
147                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
148                 Header:     w.HeaderMap,
149                 Body:       ioutil.NopCloser(w.Body)}, nil
150 }
151
152 // Close releases resources used by the server.
153 func (s *stubServer) Close() {
154         s.srv.Close()
155 }
156
157 func (s *stubServer) serveStatic(path, data string) *reqTracker {
158         rt := &reqTracker{}
159         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
160                 rt.Add(r)
161                 if r.Body != nil {
162                         ioutil.ReadAll(r.Body)
163                         r.Body.Close()
164                 }
165                 io.WriteString(w, data)
166         })
167         return rt
168 }
169
170 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
171         return s.serveStatic("/arvados/v1/users/current",
172                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
173 }
174
175 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
176         return s.serveStatic("/arvados/v1/users/current",
177                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
178 }
179
180 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
181         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
182                 `{"defaultCollectionReplication":2}`)
183 }
184
185 func (s *stubServer) serveZeroCollections() *reqTracker {
186         return s.serveStatic("/arvados/v1/collections",
187                 `{"items":[],"items_available":0}`)
188 }
189
190 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
191         rt := &reqTracker{}
192         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
193                 r.ParseForm()
194                 rt.Add(r)
195                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
196                         io.WriteString(w, `{"items_available":0,"items":[]}`)
197                 } else {
198                         io.WriteString(w, `{"items_available":3,"items":[
199                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
200                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
201                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
202                 }
203         })
204         return rt
205 }
206
207 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
208         rt := &reqTracker{}
209         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
210                 r.ParseForm()
211                 rt.Add(r)
212                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
213                         io.WriteString(w, `{"items_available":3,"items":[]}`)
214                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
215                         io.WriteString(w, `{"items_available":0,"items":[]}`)
216                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
217                         io.WriteString(w, `{"items_available":0,"items":[]}`)
218                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
219                         io.WriteString(w, `{"items_available":0,"items":[]}`)
220                 } else {
221                         io.WriteString(w, `{"items_available":2,"items":[
222                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
223                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
224                 }
225         })
226         return rt
227 }
228
229 func (s *stubServer) serveZeroKeepServices() *reqTracker {
230         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
231 }
232
233 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
234         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
235                 ItemsAvailable: len(svcs),
236                 Items:          svcs,
237         })
238 }
239
240 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
241         rt := &reqTracker{}
242         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
243                 rt.Add(r)
244                 json.NewEncoder(w).Encode(resp)
245         })
246         return rt
247 }
248
249 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
250         rt := &reqTracker{}
251         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
252                 rt.Add(r)
253                 json.NewEncoder(w).Encode(stubMounts[r.Host])
254         })
255         return rt
256 }
257
258 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
259         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
260         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
261         rt := &reqTracker{}
262         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
263                 count := rt.Add(r)
264                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
265                         io.WriteString(w, barLine)
266                 }
267                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
268                         io.WriteString(w, fooLine(count))
269                 }
270                 io.WriteString(w, "\n")
271         })
272         for _, mounts := range stubMounts {
273                 for i, mnt := range mounts {
274                         i := i
275                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
276                                 count := rt.Add(r)
277                                 r.ParseForm()
278                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
279                                         io.WriteString(w, barLine)
280                                 }
281                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
282                                         io.WriteString(w, fooLine(count))
283                                 }
284                                 io.WriteString(w, "\n")
285                         })
286                 }
287         }
288         return rt
289 }
290
291 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
292         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
293         rt := &reqTracker{}
294         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
295                 rt.Add(r)
296                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
297                         io.WriteString(w, fooLine)
298                 }
299                 io.WriteString(w, "\n")
300         })
301         for _, mounts := range stubMounts {
302                 for i, mnt := range mounts {
303                         i := i
304                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
305                                 rt.Add(r)
306                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
307                                         io.WriteString(w, fooLine)
308                                 }
309                                 io.WriteString(w, "\n")
310                         })
311                 }
312         }
313         return rt
314 }
315
316 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
317         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
318         rt := &reqTracker{}
319         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
320                 rt.Add(r)
321                 io.WriteString(w, fooLine)
322                 io.WriteString(w, "\n")
323         })
324         for _, mounts := range stubMounts {
325                 for _, mnt := range mounts {
326                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
327                                 rt.Add(r)
328                                 io.WriteString(w, fooLine)
329                                 io.WriteString(w, "\n")
330                         })
331                 }
332         }
333         return rt
334 }
335
336 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
337         return s.serveStatic("/trash", `{}`)
338 }
339
340 func (s *stubServer) serveKeepstorePull() *reqTracker {
341         return s.serveStatic("/pull", `{}`)
342 }
343
344 type runSuite struct {
345         stub   stubServer
346         config *arvados.Cluster
347         db     *sqlx.DB
348         client *arvados.Client
349 }
350
351 func (s *runSuite) newServer(options *RunOptions) *Server {
352         srv := &Server{
353                 Cluster:    s.config,
354                 ArvClient:  s.client,
355                 RunOptions: *options,
356                 Metrics:    newMetrics(prometheus.NewRegistry()),
357                 Logger:     options.Logger,
358                 Dumper:     options.Dumper,
359                 DB:         s.db,
360         }
361         return srv
362 }
363
364 func (s *runSuite) SetUpTest(c *check.C) {
365         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
366         c.Assert(err, check.Equals, nil)
367         s.config, err = cfg.GetCluster("")
368         c.Assert(err, check.Equals, nil)
369         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
370         c.Assert(err, check.IsNil)
371
372         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
373         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
374
375         s.client = &arvados.Client{
376                 AuthToken: "xyzzy",
377                 APIHost:   "zzzzz.arvadosapi.com",
378                 Client:    s.stub.Start()}
379
380         s.stub.serveDiscoveryDoc()
381         s.stub.logf = c.Logf
382 }
383
384 func (s *runSuite) TearDownTest(c *check.C) {
385         s.stub.Close()
386 }
387
388 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
389         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
390         _, err := s.db.Exec(`delete from collections`)
391         c.Assert(err, check.IsNil)
392         opts := RunOptions{
393                 CommitPulls: true,
394                 CommitTrash: true,
395                 Logger:      ctxlog.TestLogger(c),
396         }
397         s.stub.serveCurrentUserAdmin()
398         s.stub.serveZeroCollections()
399         s.stub.serveKeepServices(stubServices)
400         s.stub.serveKeepstoreMounts()
401         s.stub.serveKeepstoreIndexFoo4Bar1()
402         trashReqs := s.stub.serveKeepstoreTrash()
403         pullReqs := s.stub.serveKeepstorePull()
404         srv := s.newServer(&opts)
405         _, err = srv.runOnce(context.Background())
406         c.Check(err, check.ErrorMatches, "received zero collections")
407         c.Check(trashReqs.Count(), check.Equals, 4)
408         c.Check(pullReqs.Count(), check.Equals, 0)
409 }
410
411 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
412         opts := RunOptions{
413                 CommitPulls: true,
414                 CommitTrash: true,
415                 ChunkPrefix: "abc",
416                 Logger:      ctxlog.TestLogger(c),
417         }
418         s.stub.serveCurrentUserAdmin()
419         s.stub.serveFooBarFileCollections()
420         s.stub.serveKeepServices(stubServices)
421         s.stub.serveKeepstoreMounts()
422         s.stub.serveKeepstoreIndexIgnoringPrefix()
423         trashReqs := s.stub.serveKeepstoreTrash()
424         pullReqs := s.stub.serveKeepstorePull()
425         srv := s.newServer(&opts)
426         bal, err := srv.runOnce(context.Background())
427         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
428         c.Check(trashReqs.Count(), check.Equals, 4)
429         c.Check(pullReqs.Count(), check.Equals, 0)
430         c.Check(bal.stats.trashes, check.Equals, 0)
431         c.Check(bal.stats.pulls, check.Equals, 0)
432 }
433
434 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
435         opts := RunOptions{
436                 CommitPulls: true,
437                 CommitTrash: true,
438                 Logger:      ctxlog.TestLogger(c),
439         }
440         s.stub.serveCurrentUserNotAdmin()
441         s.stub.serveZeroCollections()
442         s.stub.serveKeepServices(stubServices)
443         s.stub.serveKeepstoreMounts()
444         trashReqs := s.stub.serveKeepstoreTrash()
445         pullReqs := s.stub.serveKeepstorePull()
446         srv := s.newServer(&opts)
447         _, err := srv.runOnce(context.Background())
448         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
449         c.Check(trashReqs.Count(), check.Equals, 0)
450         c.Check(pullReqs.Count(), check.Equals, 0)
451 }
452
453 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
454         opts := RunOptions{
455                 CommitPulls: true,
456                 CommitTrash: true,
457                 Logger:      ctxlog.TestLogger(c),
458         }
459         s.stub.serveCurrentUserAdmin()
460         s.stub.serveZeroCollections()
461         s.stub.serveKeepServices(stubServices)
462         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
463                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
464                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
465                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
466                         DeviceID:       "keep0-vol0",
467                         StorageClasses: map[string]bool{"default": true},
468                 }})
469         })
470         trashReqs := s.stub.serveKeepstoreTrash()
471         pullReqs := s.stub.serveKeepstorePull()
472         srv := s.newServer(&opts)
473         _, err := srv.runOnce(context.Background())
474         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
475         c.Check(trashReqs.Count(), check.Equals, 0)
476         c.Check(pullReqs.Count(), check.Equals, 0)
477 }
478
479 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
480         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
481         c.Assert(err, check.IsNil)
482         s.config.Collections.BlobMissingReport = lostf.Name()
483         defer os.Remove(lostf.Name())
484         opts := RunOptions{
485                 CommitPulls: true,
486                 CommitTrash: true,
487                 Logger:      ctxlog.TestLogger(c),
488         }
489         s.stub.serveCurrentUserAdmin()
490         s.stub.serveFooBarFileCollections()
491         s.stub.serveKeepServices(stubServices)
492         s.stub.serveKeepstoreMounts()
493         s.stub.serveKeepstoreIndexFoo1()
494         s.stub.serveKeepstoreTrash()
495         s.stub.serveKeepstorePull()
496         srv := s.newServer(&opts)
497         c.Assert(err, check.IsNil)
498         _, err = srv.runOnce(context.Background())
499         c.Check(err, check.IsNil)
500         lost, err := ioutil.ReadFile(lostf.Name())
501         c.Assert(err, check.IsNil)
502         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
503 }
504
505 func (s *runSuite) TestDryRun(c *check.C) {
506         opts := RunOptions{
507                 CommitPulls: false,
508                 CommitTrash: false,
509                 Logger:      ctxlog.TestLogger(c),
510         }
511         s.stub.serveCurrentUserAdmin()
512         collReqs := s.stub.serveFooBarFileCollections()
513         s.stub.serveKeepServices(stubServices)
514         s.stub.serveKeepstoreMounts()
515         s.stub.serveKeepstoreIndexFoo4Bar1()
516         trashReqs := s.stub.serveKeepstoreTrash()
517         pullReqs := s.stub.serveKeepstorePull()
518         srv := s.newServer(&opts)
519         bal, err := srv.runOnce(context.Background())
520         c.Check(err, check.IsNil)
521         for _, req := range collReqs.reqs {
522                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
523                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
524         }
525         c.Check(trashReqs.Count(), check.Equals, 0)
526         c.Check(pullReqs.Count(), check.Equals, 0)
527         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
528         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
529         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
530 }
531
532 func (s *runSuite) TestCommit(c *check.C) {
533         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
534         s.config.ManagementToken = "xyzzy"
535         opts := RunOptions{
536                 CommitPulls: true,
537                 CommitTrash: true,
538                 Logger:      ctxlog.TestLogger(c),
539                 Dumper:      ctxlog.TestLogger(c),
540         }
541         s.stub.serveCurrentUserAdmin()
542         s.stub.serveFooBarFileCollections()
543         s.stub.serveKeepServices(stubServices)
544         s.stub.serveKeepstoreMounts()
545         s.stub.serveKeepstoreIndexFoo4Bar1()
546         trashReqs := s.stub.serveKeepstoreTrash()
547         pullReqs := s.stub.serveKeepstorePull()
548         srv := s.newServer(&opts)
549         bal, err := srv.runOnce(context.Background())
550         c.Check(err, check.IsNil)
551         c.Check(trashReqs.Count(), check.Equals, 8)
552         c.Check(pullReqs.Count(), check.Equals, 4)
553         // "foo" block is overreplicated by 2
554         c.Check(bal.stats.trashes, check.Equals, 2)
555         // "bar" block is underreplicated by 1, and its only copy is
556         // in a poor rendezvous position
557         c.Check(bal.stats.pulls, check.Equals, 2)
558
559         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
560         c.Assert(err, check.IsNil)
561         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
562
563         buf, err := s.getMetrics(c, srv)
564         c.Check(err, check.IsNil)
565         bufstr := buf.String()
566         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
567         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
568         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
569         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
570         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
571 }
572
573 func (s *runSuite) TestChunkPrefix(c *check.C) {
574         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
575         opts := RunOptions{
576                 CommitPulls: true,
577                 CommitTrash: true,
578                 ChunkPrefix: "ac", // catch "foo" but not "bar"
579                 Logger:      ctxlog.TestLogger(c),
580                 Dumper:      ctxlog.TestLogger(c),
581         }
582         s.stub.serveCurrentUserAdmin()
583         s.stub.serveFooBarFileCollections()
584         s.stub.serveKeepServices(stubServices)
585         s.stub.serveKeepstoreMounts()
586         s.stub.serveKeepstoreIndexFoo4Bar1()
587         trashReqs := s.stub.serveKeepstoreTrash()
588         pullReqs := s.stub.serveKeepstorePull()
589         srv := s.newServer(&opts)
590         bal, err := srv.runOnce(context.Background())
591         c.Check(err, check.IsNil)
592         c.Check(trashReqs.Count(), check.Equals, 8)
593         c.Check(pullReqs.Count(), check.Equals, 4)
594         // "foo" block is overreplicated by 2
595         c.Check(bal.stats.trashes, check.Equals, 2)
596         // "bar" block is underreplicated but does not match prefix
597         c.Check(bal.stats.pulls, check.Equals, 0)
598
599         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
600         c.Assert(err, check.IsNil)
601         c.Check(string(lost), check.Equals, "")
602 }
603
604 func (s *runSuite) TestRunForever(c *check.C) {
605         s.config.ManagementToken = "xyzzy"
606         opts := RunOptions{
607                 CommitPulls: true,
608                 CommitTrash: true,
609                 Logger:      ctxlog.TestLogger(c),
610                 Dumper:      ctxlog.TestLogger(c),
611         }
612         s.stub.serveCurrentUserAdmin()
613         s.stub.serveFooBarFileCollections()
614         s.stub.serveKeepServices(stubServices)
615         s.stub.serveKeepstoreMounts()
616         s.stub.serveKeepstoreIndexFoo4Bar1()
617         trashReqs := s.stub.serveKeepstoreTrash()
618         pullReqs := s.stub.serveKeepstorePull()
619
620         ctx, cancel := context.WithCancel(context.Background())
621         defer cancel()
622         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
623         srv := s.newServer(&opts)
624
625         done := make(chan bool)
626         go func() {
627                 srv.runForever(ctx)
628                 close(done)
629         }()
630
631         // Each run should send 4 pull lists + 4 trash lists. The
632         // first run should also send 4 empty trash lists at
633         // startup. We should complete all four runs in much less than
634         // a second.
635         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
636                 time.Sleep(time.Millisecond)
637         }
638         cancel()
639         <-done
640         c.Check(pullReqs.Count() >= 16, check.Equals, true)
641         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
642
643         buf, err := s.getMetrics(c, srv)
644         c.Check(err, check.IsNil)
645         c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
646 }
647
648 func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
649         mfs, err := srv.Metrics.reg.Gather()
650         if err != nil {
651                 return nil, err
652         }
653
654         var buf bytes.Buffer
655         for _, mf := range mfs {
656                 if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
657                         return nil, err
658                 }
659         }
660
661         return &buf, nil
662 }