Merge branch '20264-trusted-clients-portnumber'
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/jmoiron/sqlx"
26         "github.com/prometheus/client_golang/prometheus"
27         "github.com/prometheus/common/expfmt"
28         check "gopkg.in/check.v1"
29 )
30
31 var _ = check.Suite(&runSuite{})
32
33 type reqTracker struct {
34         reqs []http.Request
35         sync.Mutex
36 }
37
38 func (rt *reqTracker) Count() int {
39         rt.Lock()
40         defer rt.Unlock()
41         return len(rt.reqs)
42 }
43
44 func (rt *reqTracker) Add(req *http.Request) int {
45         rt.Lock()
46         defer rt.Unlock()
47         rt.reqs = append(rt.reqs, *req)
48         return len(rt.reqs)
49 }
50
51 var stubServices = []arvados.KeepService{
52         {
53                 UUID:           "zzzzz-bi6l4-000000000000000",
54                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
55                 ServicePort:    25107,
56                 ServiceSSLFlag: false,
57                 ServiceType:    "disk",
58         },
59         {
60                 UUID:           "zzzzz-bi6l4-000000000000001",
61                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
62                 ServicePort:    25107,
63                 ServiceSSLFlag: false,
64                 ServiceType:    "disk",
65         },
66         {
67                 UUID:           "zzzzz-bi6l4-000000000000002",
68                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
69                 ServicePort:    25107,
70                 ServiceSSLFlag: false,
71                 ServiceType:    "disk",
72         },
73         {
74                 UUID:           "zzzzz-bi6l4-000000000000003",
75                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
76                 ServicePort:    25107,
77                 ServiceSSLFlag: false,
78                 ServiceType:    "disk",
79         },
80         {
81                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
82                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
83                 ServicePort:    25333,
84                 ServiceSSLFlag: true,
85                 ServiceType:    "proxy",
86         },
87 }
88
89 var stubMounts = map[string][]arvados.KeepMount{
90         "keep0.zzzzz.arvadosapi.com:25107": {{
91                 UUID:           "zzzzz-ivpuk-000000000000000",
92                 DeviceID:       "keep0-vol0",
93                 StorageClasses: map[string]bool{"default": true},
94         }},
95         "keep1.zzzzz.arvadosapi.com:25107": {{
96                 UUID:           "zzzzz-ivpuk-100000000000000",
97                 DeviceID:       "keep1-vol0",
98                 StorageClasses: map[string]bool{"default": true},
99         }},
100         "keep2.zzzzz.arvadosapi.com:25107": {{
101                 UUID:           "zzzzz-ivpuk-200000000000000",
102                 DeviceID:       "keep2-vol0",
103                 StorageClasses: map[string]bool{"default": true},
104         }},
105         "keep3.zzzzz.arvadosapi.com:25107": {{
106                 UUID:           "zzzzz-ivpuk-300000000000000",
107                 DeviceID:       "keep3-vol0",
108                 StorageClasses: map[string]bool{"default": true},
109         }},
110 }
111
112 // stubServer is an HTTP transport that intercepts and processes all
113 // requests using its own handlers.
114 type stubServer struct {
115         mux      *http.ServeMux
116         srv      *httptest.Server
117         mutex    sync.Mutex
118         Requests reqTracker
119         logf     func(string, ...interface{})
120 }
121
122 // Start initializes the stub server and returns an *http.Client that
123 // uses the stub server to handle all requests.
124 //
125 // A stubServer that has been started should eventually be shut down
126 // with Close().
127 func (s *stubServer) Start() *http.Client {
128         // Set up a config.Client that forwards all requests to s.mux
129         // via s.srv. Test cases will attach handlers to s.mux to get
130         // the desired responses.
131         s.mux = http.NewServeMux()
132         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
133                 s.mutex.Lock()
134                 s.Requests.Add(r)
135                 s.mutex.Unlock()
136                 w.Header().Set("Content-Type", "application/json")
137                 s.mux.ServeHTTP(w, r)
138         }))
139         return &http.Client{Transport: s}
140 }
141
142 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
143         w := httptest.NewRecorder()
144         s.mux.ServeHTTP(w, req)
145         return &http.Response{
146                 StatusCode: w.Code,
147                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
148                 Header:     w.HeaderMap,
149                 Body:       ioutil.NopCloser(w.Body)}, nil
150 }
151
152 // Close releases resources used by the server.
153 func (s *stubServer) Close() {
154         s.srv.Close()
155 }
156
157 func (s *stubServer) serveStatic(path, data string) *reqTracker {
158         rt := &reqTracker{}
159         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
160                 rt.Add(r)
161                 if r.Body != nil {
162                         ioutil.ReadAll(r.Body)
163                         r.Body.Close()
164                 }
165                 io.WriteString(w, data)
166         })
167         return rt
168 }
169
170 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
171         return s.serveStatic("/arvados/v1/users/current",
172                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
173 }
174
175 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
176         return s.serveStatic("/arvados/v1/users/current",
177                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
178 }
179
180 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
181         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
182                 `{"defaultCollectionReplication":2}`)
183 }
184
185 func (s *stubServer) serveZeroCollections() *reqTracker {
186         return s.serveStatic("/arvados/v1/collections",
187                 `{"items":[],"items_available":0}`)
188 }
189
190 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
191         rt := &reqTracker{}
192         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
193                 r.ParseForm()
194                 rt.Add(r)
195                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
196                         io.WriteString(w, `{"items_available":0,"items":[]}`)
197                 } else {
198                         io.WriteString(w, `{"items_available":3,"items":[
199                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
200                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
201                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
202                 }
203         })
204         return rt
205 }
206
207 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
208         rt := &reqTracker{}
209         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
210                 r.ParseForm()
211                 rt.Add(r)
212                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
213                         io.WriteString(w, `{"items_available":3,"items":[]}`)
214                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
215                         io.WriteString(w, `{"items_available":0,"items":[]}`)
216                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
217                         io.WriteString(w, `{"items_available":0,"items":[]}`)
218                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
219                         io.WriteString(w, `{"items_available":0,"items":[]}`)
220                 } else {
221                         io.WriteString(w, `{"items_available":2,"items":[
222                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
223                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
224                 }
225         })
226         return rt
227 }
228
229 func (s *stubServer) serveZeroKeepServices() *reqTracker {
230         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
231 }
232
233 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
234         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
235                 ItemsAvailable: len(svcs),
236                 Items:          svcs,
237         })
238 }
239
240 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
241         rt := &reqTracker{}
242         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
243                 rt.Add(r)
244                 json.NewEncoder(w).Encode(resp)
245         })
246         return rt
247 }
248
249 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
250         rt := &reqTracker{}
251         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
252                 rt.Add(r)
253                 json.NewEncoder(w).Encode(stubMounts[r.Host])
254         })
255         return rt
256 }
257
258 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
259         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
260         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
261         rt := &reqTracker{}
262         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
263                 count := rt.Add(r)
264                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
265                         io.WriteString(w, barLine)
266                 }
267                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
268                         io.WriteString(w, fooLine(count))
269                 }
270                 io.WriteString(w, "\n")
271         })
272         for _, mounts := range stubMounts {
273                 for i, mnt := range mounts {
274                         i := i
275                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
276                                 count := rt.Add(r)
277                                 r.ParseForm()
278                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
279                                         io.WriteString(w, barLine)
280                                 }
281                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
282                                         io.WriteString(w, fooLine(count))
283                                 }
284                                 io.WriteString(w, "\n")
285                         })
286                 }
287         }
288         return rt
289 }
290
291 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
292         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
293         rt := &reqTracker{}
294         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
295                 rt.Add(r)
296                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
297                         io.WriteString(w, fooLine)
298                 }
299                 io.WriteString(w, "\n")
300         })
301         for _, mounts := range stubMounts {
302                 for i, mnt := range mounts {
303                         i := i
304                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
305                                 rt.Add(r)
306                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
307                                         io.WriteString(w, fooLine)
308                                 }
309                                 io.WriteString(w, "\n")
310                         })
311                 }
312         }
313         return rt
314 }
315
316 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
317         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
318         rt := &reqTracker{}
319         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
320                 rt.Add(r)
321                 io.WriteString(w, fooLine)
322                 io.WriteString(w, "\n")
323         })
324         for _, mounts := range stubMounts {
325                 for _, mnt := range mounts {
326                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
327                                 rt.Add(r)
328                                 io.WriteString(w, fooLine)
329                                 io.WriteString(w, "\n")
330                         })
331                 }
332         }
333         return rt
334 }
335
336 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
337         return s.serveStatic("/trash", `{}`)
338 }
339
340 func (s *stubServer) serveKeepstorePull() *reqTracker {
341         return s.serveStatic("/pull", `{}`)
342 }
343
344 type runSuite struct {
345         stub   stubServer
346         config *arvados.Cluster
347         db     *sqlx.DB
348         client *arvados.Client
349 }
350
351 func (s *runSuite) newServer(options *RunOptions) *Server {
352         srv := &Server{
353                 Cluster:    s.config,
354                 ArvClient:  s.client,
355                 RunOptions: *options,
356                 Metrics:    newMetrics(prometheus.NewRegistry()),
357                 Logger:     options.Logger,
358                 Dumper:     options.Dumper,
359                 DB:         s.db,
360         }
361         return srv
362 }
363
364 func (s *runSuite) SetUpTest(c *check.C) {
365         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
366         c.Assert(err, check.Equals, nil)
367         s.config, err = cfg.GetCluster("")
368         c.Assert(err, check.Equals, nil)
369         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
370         c.Assert(err, check.IsNil)
371
372         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
373         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
374
375         s.client = &arvados.Client{
376                 AuthToken: "xyzzy",
377                 APIHost:   "zzzzz.arvadosapi.com",
378                 Client:    s.stub.Start()}
379
380         s.stub.serveDiscoveryDoc()
381         s.stub.logf = c.Logf
382 }
383
384 func (s *runSuite) TearDownTest(c *check.C) {
385         s.stub.Close()
386 }
387
388 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
389         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
390         _, err := s.db.Exec(`delete from collections`)
391         c.Assert(err, check.IsNil)
392         opts := RunOptions{
393                 CommitPulls: true,
394                 CommitTrash: true,
395                 Logger:      ctxlog.TestLogger(c),
396         }
397         s.stub.serveCurrentUserAdmin()
398         s.stub.serveZeroCollections()
399         s.stub.serveKeepServices(stubServices)
400         s.stub.serveKeepstoreMounts()
401         s.stub.serveKeepstoreIndexFoo4Bar1()
402         trashReqs := s.stub.serveKeepstoreTrash()
403         pullReqs := s.stub.serveKeepstorePull()
404         srv := s.newServer(&opts)
405         _, err = srv.runOnce(context.Background())
406         c.Check(err, check.ErrorMatches, "received zero collections")
407         c.Check(trashReqs.Count(), check.Equals, 4)
408         c.Check(pullReqs.Count(), check.Equals, 0)
409 }
410
411 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
412         opts := RunOptions{
413                 CommitPulls: true,
414                 CommitTrash: true,
415                 ChunkPrefix: "abc",
416                 Logger:      ctxlog.TestLogger(c),
417         }
418         s.stub.serveCurrentUserAdmin()
419         s.stub.serveFooBarFileCollections()
420         s.stub.serveKeepServices(stubServices)
421         s.stub.serveKeepstoreMounts()
422         s.stub.serveKeepstoreIndexIgnoringPrefix()
423         trashReqs := s.stub.serveKeepstoreTrash()
424         pullReqs := s.stub.serveKeepstorePull()
425         srv := s.newServer(&opts)
426         bal, err := srv.runOnce(context.Background())
427         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
428         c.Check(trashReqs.Count(), check.Equals, 4)
429         c.Check(pullReqs.Count(), check.Equals, 0)
430         c.Check(bal.stats.trashes, check.Equals, 0)
431         c.Check(bal.stats.pulls, check.Equals, 0)
432 }
433
434 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
435         opts := RunOptions{
436                 CommitPulls: true,
437                 CommitTrash: true,
438                 Logger:      ctxlog.TestLogger(c),
439         }
440         s.stub.serveCurrentUserNotAdmin()
441         s.stub.serveZeroCollections()
442         s.stub.serveKeepServices(stubServices)
443         s.stub.serveKeepstoreMounts()
444         trashReqs := s.stub.serveKeepstoreTrash()
445         pullReqs := s.stub.serveKeepstorePull()
446         srv := s.newServer(&opts)
447         _, err := srv.runOnce(context.Background())
448         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
449         c.Check(trashReqs.Count(), check.Equals, 0)
450         c.Check(pullReqs.Count(), check.Equals, 0)
451 }
452
453 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
454         for _, trial := range []struct {
455                 prefix string
456                 errRe  string
457         }{
458                 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
459                 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
460                 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
461         } {
462                 s.SetUpTest(c)
463                 c.Logf("trying invalid prefix %q", trial.prefix)
464                 opts := RunOptions{
465                         CommitPulls: true,
466                         CommitTrash: true,
467                         ChunkPrefix: trial.prefix,
468                         Logger:      ctxlog.TestLogger(c),
469                 }
470                 s.stub.serveCurrentUserAdmin()
471                 s.stub.serveFooBarFileCollections()
472                 s.stub.serveKeepServices(stubServices)
473                 s.stub.serveKeepstoreMounts()
474                 trashReqs := s.stub.serveKeepstoreTrash()
475                 pullReqs := s.stub.serveKeepstorePull()
476                 srv := s.newServer(&opts)
477                 _, err := srv.runOnce(context.Background())
478                 c.Check(err, check.ErrorMatches, trial.errRe)
479                 c.Check(trashReqs.Count(), check.Equals, 0)
480                 c.Check(pullReqs.Count(), check.Equals, 0)
481         }
482 }
483
484 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
485         opts := RunOptions{
486                 CommitPulls: true,
487                 CommitTrash: true,
488                 Logger:      ctxlog.TestLogger(c),
489         }
490         s.stub.serveCurrentUserAdmin()
491         s.stub.serveZeroCollections()
492         s.stub.serveKeepServices(stubServices)
493         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
494                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
495                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
496                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
497                         DeviceID:       "keep0-vol0",
498                         StorageClasses: map[string]bool{"default": true},
499                 }})
500         })
501         trashReqs := s.stub.serveKeepstoreTrash()
502         pullReqs := s.stub.serveKeepstorePull()
503         srv := s.newServer(&opts)
504         _, err := srv.runOnce(context.Background())
505         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
506         c.Check(trashReqs.Count(), check.Equals, 0)
507         c.Check(pullReqs.Count(), check.Equals, 0)
508 }
509
510 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
511         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
512         c.Assert(err, check.IsNil)
513         s.config.Collections.BlobMissingReport = lostf.Name()
514         defer os.Remove(lostf.Name())
515         opts := RunOptions{
516                 CommitPulls: true,
517                 CommitTrash: true,
518                 Logger:      ctxlog.TestLogger(c),
519         }
520         s.stub.serveCurrentUserAdmin()
521         s.stub.serveFooBarFileCollections()
522         s.stub.serveKeepServices(stubServices)
523         s.stub.serveKeepstoreMounts()
524         s.stub.serveKeepstoreIndexFoo1()
525         s.stub.serveKeepstoreTrash()
526         s.stub.serveKeepstorePull()
527         srv := s.newServer(&opts)
528         c.Assert(err, check.IsNil)
529         _, err = srv.runOnce(context.Background())
530         c.Check(err, check.IsNil)
531         lost, err := ioutil.ReadFile(lostf.Name())
532         c.Assert(err, check.IsNil)
533         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
534 }
535
536 func (s *runSuite) TestDryRun(c *check.C) {
537         opts := RunOptions{
538                 CommitPulls: false,
539                 CommitTrash: false,
540                 Logger:      ctxlog.TestLogger(c),
541         }
542         s.stub.serveCurrentUserAdmin()
543         collReqs := s.stub.serveFooBarFileCollections()
544         s.stub.serveKeepServices(stubServices)
545         s.stub.serveKeepstoreMounts()
546         s.stub.serveKeepstoreIndexFoo4Bar1()
547         trashReqs := s.stub.serveKeepstoreTrash()
548         pullReqs := s.stub.serveKeepstorePull()
549         srv := s.newServer(&opts)
550         bal, err := srv.runOnce(context.Background())
551         c.Check(err, check.IsNil)
552         for _, req := range collReqs.reqs {
553                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
554                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
555         }
556         c.Check(trashReqs.Count(), check.Equals, 0)
557         c.Check(pullReqs.Count(), check.Equals, 0)
558         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
559         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
560         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
561 }
562
563 func (s *runSuite) TestCommit(c *check.C) {
564         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
565         s.config.ManagementToken = "xyzzy"
566         opts := RunOptions{
567                 CommitPulls: true,
568                 CommitTrash: true,
569                 Logger:      ctxlog.TestLogger(c),
570                 Dumper:      ctxlog.TestLogger(c),
571         }
572         s.stub.serveCurrentUserAdmin()
573         s.stub.serveFooBarFileCollections()
574         s.stub.serveKeepServices(stubServices)
575         s.stub.serveKeepstoreMounts()
576         s.stub.serveKeepstoreIndexFoo4Bar1()
577         trashReqs := s.stub.serveKeepstoreTrash()
578         pullReqs := s.stub.serveKeepstorePull()
579         srv := s.newServer(&opts)
580         bal, err := srv.runOnce(context.Background())
581         c.Check(err, check.IsNil)
582         c.Check(trashReqs.Count(), check.Equals, 8)
583         c.Check(pullReqs.Count(), check.Equals, 4)
584         // "foo" block is overreplicated by 2
585         c.Check(bal.stats.trashes, check.Equals, 2)
586         // "bar" block is underreplicated by 1, and its only copy is
587         // in a poor rendezvous position
588         c.Check(bal.stats.pulls, check.Equals, 2)
589
590         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
591         c.Assert(err, check.IsNil)
592         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
593
594         buf, err := s.getMetrics(c, srv)
595         c.Check(err, check.IsNil)
596         bufstr := buf.String()
597         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
598         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
599         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
600         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
601         c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
602 }
603
604 func (s *runSuite) TestChunkPrefix(c *check.C) {
605         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
606         opts := RunOptions{
607                 CommitPulls: true,
608                 CommitTrash: true,
609                 ChunkPrefix: "ac", // catch "foo" but not "bar"
610                 Logger:      ctxlog.TestLogger(c),
611                 Dumper:      ctxlog.TestLogger(c),
612         }
613         s.stub.serveCurrentUserAdmin()
614         s.stub.serveFooBarFileCollections()
615         s.stub.serveKeepServices(stubServices)
616         s.stub.serveKeepstoreMounts()
617         s.stub.serveKeepstoreIndexFoo4Bar1()
618         trashReqs := s.stub.serveKeepstoreTrash()
619         pullReqs := s.stub.serveKeepstorePull()
620         srv := s.newServer(&opts)
621         bal, err := srv.runOnce(context.Background())
622         c.Check(err, check.IsNil)
623         c.Check(trashReqs.Count(), check.Equals, 8)
624         c.Check(pullReqs.Count(), check.Equals, 4)
625         // "foo" block is overreplicated by 2
626         c.Check(bal.stats.trashes, check.Equals, 2)
627         // "bar" block is underreplicated but does not match prefix
628         c.Check(bal.stats.pulls, check.Equals, 0)
629
630         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
631         c.Assert(err, check.IsNil)
632         c.Check(string(lost), check.Equals, "")
633 }
634
635 func (s *runSuite) TestRunForever(c *check.C) {
636         s.config.ManagementToken = "xyzzy"
637         opts := RunOptions{
638                 CommitPulls: true,
639                 CommitTrash: true,
640                 Logger:      ctxlog.TestLogger(c),
641                 Dumper:      ctxlog.TestLogger(c),
642         }
643         s.stub.serveCurrentUserAdmin()
644         s.stub.serveFooBarFileCollections()
645         s.stub.serveKeepServices(stubServices)
646         s.stub.serveKeepstoreMounts()
647         s.stub.serveKeepstoreIndexFoo4Bar1()
648         trashReqs := s.stub.serveKeepstoreTrash()
649         pullReqs := s.stub.serveKeepstorePull()
650
651         ctx, cancel := context.WithCancel(context.Background())
652         defer cancel()
653         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
654         srv := s.newServer(&opts)
655
656         done := make(chan bool)
657         go func() {
658                 srv.runForever(ctx)
659                 close(done)
660         }()
661
662         // Each run should send 4 pull lists + 4 trash lists. The
663         // first run should also send 4 empty trash lists at
664         // startup. We should complete all four runs in much less than
665         // a second.
666         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
667                 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
668                         break
669                 }
670                 time.Sleep(time.Millisecond)
671         }
672         cancel()
673         <-done
674         c.Check(pullReqs.Count() >= 16, check.Equals, true)
675         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
676
677         buf, err := s.getMetrics(c, srv)
678         c.Check(err, check.IsNil)
679         c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
680 }
681
682 func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
683         mfs, err := srv.Metrics.reg.Gather()
684         if err != nil {
685                 return nil, err
686         }
687
688         var buf bytes.Buffer
689         for _, mf := range mfs {
690                 if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
691                         return nil, err
692                 }
693         }
694
695         return &buf, nil
696 }