Merge branch '20755-ec2-multiple-subnets'
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/config"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "github.com/jmoiron/sqlx"
25         "github.com/prometheus/client_golang/prometheus"
26         check "gopkg.in/check.v1"
27 )
28
29 var _ = check.Suite(&runSuite{})
30
31 type reqTracker struct {
32         reqs []http.Request
33         sync.Mutex
34 }
35
36 func (rt *reqTracker) Count() int {
37         rt.Lock()
38         defer rt.Unlock()
39         return len(rt.reqs)
40 }
41
42 func (rt *reqTracker) Add(req *http.Request) int {
43         rt.Lock()
44         defer rt.Unlock()
45         rt.reqs = append(rt.reqs, *req)
46         return len(rt.reqs)
47 }
48
49 var stubServices = []arvados.KeepService{
50         {
51                 UUID:           "zzzzz-bi6l4-000000000000000",
52                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
53                 ServicePort:    25107,
54                 ServiceSSLFlag: false,
55                 ServiceType:    "disk",
56         },
57         {
58                 UUID:           "zzzzz-bi6l4-000000000000001",
59                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
60                 ServicePort:    25107,
61                 ServiceSSLFlag: false,
62                 ServiceType:    "disk",
63         },
64         {
65                 UUID:           "zzzzz-bi6l4-000000000000002",
66                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
67                 ServicePort:    25107,
68                 ServiceSSLFlag: false,
69                 ServiceType:    "disk",
70         },
71         {
72                 UUID:           "zzzzz-bi6l4-000000000000003",
73                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
74                 ServicePort:    25107,
75                 ServiceSSLFlag: false,
76                 ServiceType:    "disk",
77         },
78         {
79                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
80                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
81                 ServicePort:    25333,
82                 ServiceSSLFlag: true,
83                 ServiceType:    "proxy",
84         },
85 }
86
87 var stubMounts = map[string][]arvados.KeepMount{
88         "keep0.zzzzz.arvadosapi.com:25107": {{
89                 UUID:           "zzzzz-ivpuk-000000000000000",
90                 DeviceID:       "keep0-vol0",
91                 StorageClasses: map[string]bool{"default": true},
92         }},
93         "keep1.zzzzz.arvadosapi.com:25107": {{
94                 UUID:           "zzzzz-ivpuk-100000000000000",
95                 DeviceID:       "keep1-vol0",
96                 StorageClasses: map[string]bool{"default": true},
97         }},
98         "keep2.zzzzz.arvadosapi.com:25107": {{
99                 UUID:           "zzzzz-ivpuk-200000000000000",
100                 DeviceID:       "keep2-vol0",
101                 StorageClasses: map[string]bool{"default": true},
102         }},
103         "keep3.zzzzz.arvadosapi.com:25107": {{
104                 UUID:           "zzzzz-ivpuk-300000000000000",
105                 DeviceID:       "keep3-vol0",
106                 StorageClasses: map[string]bool{"default": true},
107         }},
108 }
109
110 // stubServer is an HTTP transport that intercepts and processes all
111 // requests using its own handlers.
112 type stubServer struct {
113         mux      *http.ServeMux
114         srv      *httptest.Server
115         mutex    sync.Mutex
116         Requests reqTracker
117         logf     func(string, ...interface{})
118 }
119
120 // Start initializes the stub server and returns an *http.Client that
121 // uses the stub server to handle all requests.
122 //
123 // A stubServer that has been started should eventually be shut down
124 // with Close().
125 func (s *stubServer) Start() *http.Client {
126         // Set up a config.Client that forwards all requests to s.mux
127         // via s.srv. Test cases will attach handlers to s.mux to get
128         // the desired responses.
129         s.mux = http.NewServeMux()
130         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
131                 s.mutex.Lock()
132                 s.Requests.Add(r)
133                 s.mutex.Unlock()
134                 w.Header().Set("Content-Type", "application/json")
135                 s.mux.ServeHTTP(w, r)
136         }))
137         return &http.Client{Transport: s}
138 }
139
140 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
141         w := httptest.NewRecorder()
142         s.mux.ServeHTTP(w, req)
143         return &http.Response{
144                 StatusCode: w.Code,
145                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
146                 Header:     w.HeaderMap,
147                 Body:       ioutil.NopCloser(w.Body)}, nil
148 }
149
150 // Close releases resources used by the server.
151 func (s *stubServer) Close() {
152         s.srv.Close()
153 }
154
155 func (s *stubServer) serveStatic(path, data string) *reqTracker {
156         rt := &reqTracker{}
157         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
158                 rt.Add(r)
159                 if r.Body != nil {
160                         ioutil.ReadAll(r.Body)
161                         r.Body.Close()
162                 }
163                 io.WriteString(w, data)
164         })
165         return rt
166 }
167
168 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
169         return s.serveStatic("/arvados/v1/users/current",
170                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
171 }
172
173 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
174         return s.serveStatic("/arvados/v1/users/current",
175                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
176 }
177
178 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
179         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
180                 `{"defaultCollectionReplication":2}`)
181 }
182
183 func (s *stubServer) serveZeroCollections() *reqTracker {
184         return s.serveStatic("/arvados/v1/collections",
185                 `{"items":[],"items_available":0}`)
186 }
187
188 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
189         rt := &reqTracker{}
190         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
191                 r.ParseForm()
192                 rt.Add(r)
193                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
194                         io.WriteString(w, `{"items_available":0,"items":[]}`)
195                 } else {
196                         io.WriteString(w, `{"items_available":3,"items":[
197                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
198                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
199                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
200                 }
201         })
202         return rt
203 }
204
205 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
206         rt := &reqTracker{}
207         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
208                 r.ParseForm()
209                 rt.Add(r)
210                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
211                         io.WriteString(w, `{"items_available":3,"items":[]}`)
212                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
213                         io.WriteString(w, `{"items_available":0,"items":[]}`)
214                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
215                         io.WriteString(w, `{"items_available":0,"items":[]}`)
216                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
217                         io.WriteString(w, `{"items_available":0,"items":[]}`)
218                 } else {
219                         io.WriteString(w, `{"items_available":2,"items":[
220                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
221                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
222                 }
223         })
224         return rt
225 }
226
227 func (s *stubServer) serveZeroKeepServices() *reqTracker {
228         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
229 }
230
231 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
232         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
233                 ItemsAvailable: len(svcs),
234                 Items:          svcs,
235         })
236 }
237
238 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
239         rt := &reqTracker{}
240         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
241                 rt.Add(r)
242                 json.NewEncoder(w).Encode(resp)
243         })
244         return rt
245 }
246
247 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
248         rt := &reqTracker{}
249         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
250                 rt.Add(r)
251                 json.NewEncoder(w).Encode(stubMounts[r.Host])
252         })
253         return rt
254 }
255
256 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
257         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
258         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
259         rt := &reqTracker{}
260         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
261                 count := rt.Add(r)
262                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
263                         io.WriteString(w, barLine)
264                 }
265                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
266                         io.WriteString(w, fooLine(count))
267                 }
268                 io.WriteString(w, "\n")
269         })
270         for _, mounts := range stubMounts {
271                 for i, mnt := range mounts {
272                         i := i
273                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
274                                 count := rt.Add(r)
275                                 r.ParseForm()
276                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
277                                         io.WriteString(w, barLine)
278                                 }
279                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
280                                         io.WriteString(w, fooLine(count))
281                                 }
282                                 io.WriteString(w, "\n")
283                         })
284                 }
285         }
286         return rt
287 }
288
289 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
290         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
291         rt := &reqTracker{}
292         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
293                 rt.Add(r)
294                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
295                         io.WriteString(w, fooLine)
296                 }
297                 io.WriteString(w, "\n")
298         })
299         for _, mounts := range stubMounts {
300                 for i, mnt := range mounts {
301                         i := i
302                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
303                                 rt.Add(r)
304                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
305                                         io.WriteString(w, fooLine)
306                                 }
307                                 io.WriteString(w, "\n")
308                         })
309                 }
310         }
311         return rt
312 }
313
314 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
315         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
316         rt := &reqTracker{}
317         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
318                 rt.Add(r)
319                 io.WriteString(w, fooLine)
320                 io.WriteString(w, "\n")
321         })
322         for _, mounts := range stubMounts {
323                 for _, mnt := range mounts {
324                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
325                                 rt.Add(r)
326                                 io.WriteString(w, fooLine)
327                                 io.WriteString(w, "\n")
328                         })
329                 }
330         }
331         return rt
332 }
333
334 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
335         return s.serveStatic("/trash", `{}`)
336 }
337
338 func (s *stubServer) serveKeepstorePull() *reqTracker {
339         return s.serveStatic("/pull", `{}`)
340 }
341
342 type runSuite struct {
343         stub   stubServer
344         config *arvados.Cluster
345         db     *sqlx.DB
346         client *arvados.Client
347 }
348
349 func (s *runSuite) newServer(options *RunOptions) *Server {
350         srv := &Server{
351                 Cluster:    s.config,
352                 ArvClient:  s.client,
353                 RunOptions: *options,
354                 Metrics:    newMetrics(prometheus.NewRegistry()),
355                 Logger:     options.Logger,
356                 Dumper:     options.Dumper,
357                 DB:         s.db,
358         }
359         return srv
360 }
361
362 func (s *runSuite) SetUpTest(c *check.C) {
363         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
364         c.Assert(err, check.Equals, nil)
365         s.config, err = cfg.GetCluster("")
366         c.Assert(err, check.Equals, nil)
367         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
368         c.Assert(err, check.IsNil)
369
370         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
371         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
372
373         s.client = &arvados.Client{
374                 AuthToken: "xyzzy",
375                 APIHost:   "zzzzz.arvadosapi.com",
376                 Client:    s.stub.Start()}
377
378         s.stub.serveDiscoveryDoc()
379         s.stub.logf = c.Logf
380 }
381
382 func (s *runSuite) TearDownTest(c *check.C) {
383         s.stub.Close()
384 }
385
386 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
387         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
388         _, err := s.db.Exec(`delete from collections`)
389         c.Assert(err, check.IsNil)
390         opts := RunOptions{
391                 CommitPulls: true,
392                 CommitTrash: true,
393                 Logger:      ctxlog.TestLogger(c),
394         }
395         s.stub.serveCurrentUserAdmin()
396         s.stub.serveZeroCollections()
397         s.stub.serveKeepServices(stubServices)
398         s.stub.serveKeepstoreMounts()
399         s.stub.serveKeepstoreIndexFoo4Bar1()
400         trashReqs := s.stub.serveKeepstoreTrash()
401         pullReqs := s.stub.serveKeepstorePull()
402         srv := s.newServer(&opts)
403         _, err = srv.runOnce(context.Background())
404         c.Check(err, check.ErrorMatches, "received zero collections")
405         c.Check(trashReqs.Count(), check.Equals, 4)
406         c.Check(pullReqs.Count(), check.Equals, 0)
407 }
408
409 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
410         opts := RunOptions{
411                 CommitPulls: true,
412                 CommitTrash: true,
413                 ChunkPrefix: "abc",
414                 Logger:      ctxlog.TestLogger(c),
415         }
416         s.stub.serveCurrentUserAdmin()
417         s.stub.serveFooBarFileCollections()
418         s.stub.serveKeepServices(stubServices)
419         s.stub.serveKeepstoreMounts()
420         s.stub.serveKeepstoreIndexIgnoringPrefix()
421         trashReqs := s.stub.serveKeepstoreTrash()
422         pullReqs := s.stub.serveKeepstorePull()
423         srv := s.newServer(&opts)
424         bal, err := srv.runOnce(context.Background())
425         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
426         c.Check(trashReqs.Count(), check.Equals, 4)
427         c.Check(pullReqs.Count(), check.Equals, 0)
428         c.Check(bal.stats.trashes, check.Equals, 0)
429         c.Check(bal.stats.pulls, check.Equals, 0)
430 }
431
432 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
433         opts := RunOptions{
434                 CommitPulls: true,
435                 CommitTrash: true,
436                 Logger:      ctxlog.TestLogger(c),
437         }
438         s.stub.serveCurrentUserNotAdmin()
439         s.stub.serveZeroCollections()
440         s.stub.serveKeepServices(stubServices)
441         s.stub.serveKeepstoreMounts()
442         trashReqs := s.stub.serveKeepstoreTrash()
443         pullReqs := s.stub.serveKeepstorePull()
444         srv := s.newServer(&opts)
445         _, err := srv.runOnce(context.Background())
446         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
447         c.Check(trashReqs.Count(), check.Equals, 0)
448         c.Check(pullReqs.Count(), check.Equals, 0)
449 }
450
451 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
452         for _, trial := range []struct {
453                 prefix string
454                 errRe  string
455         }{
456                 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
457                 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
458                 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
459         } {
460                 s.SetUpTest(c)
461                 c.Logf("trying invalid prefix %q", trial.prefix)
462                 opts := RunOptions{
463                         CommitPulls: true,
464                         CommitTrash: true,
465                         ChunkPrefix: trial.prefix,
466                         Logger:      ctxlog.TestLogger(c),
467                 }
468                 s.stub.serveCurrentUserAdmin()
469                 s.stub.serveFooBarFileCollections()
470                 s.stub.serveKeepServices(stubServices)
471                 s.stub.serveKeepstoreMounts()
472                 trashReqs := s.stub.serveKeepstoreTrash()
473                 pullReqs := s.stub.serveKeepstorePull()
474                 srv := s.newServer(&opts)
475                 _, err := srv.runOnce(context.Background())
476                 c.Check(err, check.ErrorMatches, trial.errRe)
477                 c.Check(trashReqs.Count(), check.Equals, 0)
478                 c.Check(pullReqs.Count(), check.Equals, 0)
479         }
480 }
481
482 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
483         opts := RunOptions{
484                 CommitPulls: true,
485                 CommitTrash: true,
486                 Logger:      ctxlog.TestLogger(c),
487         }
488         s.stub.serveCurrentUserAdmin()
489         s.stub.serveZeroCollections()
490         s.stub.serveKeepServices(stubServices)
491         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
492                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
493                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
494                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
495                         DeviceID:       "keep0-vol0",
496                         StorageClasses: map[string]bool{"default": true},
497                 }})
498         })
499         trashReqs := s.stub.serveKeepstoreTrash()
500         pullReqs := s.stub.serveKeepstorePull()
501         srv := s.newServer(&opts)
502         _, err := srv.runOnce(context.Background())
503         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
504         c.Check(trashReqs.Count(), check.Equals, 0)
505         c.Check(pullReqs.Count(), check.Equals, 0)
506 }
507
508 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
509         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
510         c.Assert(err, check.IsNil)
511         s.config.Collections.BlobMissingReport = lostf.Name()
512         defer os.Remove(lostf.Name())
513         opts := RunOptions{
514                 CommitPulls: true,
515                 CommitTrash: true,
516                 Logger:      ctxlog.TestLogger(c),
517         }
518         s.stub.serveCurrentUserAdmin()
519         s.stub.serveFooBarFileCollections()
520         s.stub.serveKeepServices(stubServices)
521         s.stub.serveKeepstoreMounts()
522         s.stub.serveKeepstoreIndexFoo1()
523         s.stub.serveKeepstoreTrash()
524         s.stub.serveKeepstorePull()
525         srv := s.newServer(&opts)
526         c.Assert(err, check.IsNil)
527         _, err = srv.runOnce(context.Background())
528         c.Check(err, check.IsNil)
529         lost, err := ioutil.ReadFile(lostf.Name())
530         c.Assert(err, check.IsNil)
531         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
532 }
533
534 func (s *runSuite) TestDryRun(c *check.C) {
535         opts := RunOptions{
536                 CommitPulls: false,
537                 CommitTrash: false,
538                 Logger:      ctxlog.TestLogger(c),
539         }
540         s.stub.serveCurrentUserAdmin()
541         collReqs := s.stub.serveFooBarFileCollections()
542         s.stub.serveKeepServices(stubServices)
543         s.stub.serveKeepstoreMounts()
544         s.stub.serveKeepstoreIndexFoo4Bar1()
545         trashReqs := s.stub.serveKeepstoreTrash()
546         pullReqs := s.stub.serveKeepstorePull()
547         srv := s.newServer(&opts)
548         bal, err := srv.runOnce(context.Background())
549         c.Check(err, check.IsNil)
550         for _, req := range collReqs.reqs {
551                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
552                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
553         }
554         c.Check(trashReqs.Count(), check.Equals, 0)
555         c.Check(pullReqs.Count(), check.Equals, 0)
556         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
557         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
559 }
560
561 func (s *runSuite) TestCommit(c *check.C) {
562         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
563         s.config.ManagementToken = "xyzzy"
564         opts := RunOptions{
565                 CommitPulls: true,
566                 CommitTrash: true,
567                 Logger:      ctxlog.TestLogger(c),
568                 Dumper:      ctxlog.TestLogger(c),
569         }
570         s.stub.serveCurrentUserAdmin()
571         s.stub.serveFooBarFileCollections()
572         s.stub.serveKeepServices(stubServices)
573         s.stub.serveKeepstoreMounts()
574         s.stub.serveKeepstoreIndexFoo4Bar1()
575         trashReqs := s.stub.serveKeepstoreTrash()
576         pullReqs := s.stub.serveKeepstorePull()
577         srv := s.newServer(&opts)
578         bal, err := srv.runOnce(context.Background())
579         c.Check(err, check.IsNil)
580         c.Check(trashReqs.Count(), check.Equals, 8)
581         c.Check(pullReqs.Count(), check.Equals, 4)
582         // "foo" block is overreplicated by 2
583         c.Check(bal.stats.trashes, check.Equals, 2)
584         // "bar" block is underreplicated by 1, and its only copy is
585         // in a poor rendezvous position
586         c.Check(bal.stats.pulls, check.Equals, 2)
587
588         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
589         c.Assert(err, check.IsNil)
590         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
591
592         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
593         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
594         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
595         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
596         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
597         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
598 }
599
600 func (s *runSuite) TestChunkPrefix(c *check.C) {
601         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
602         opts := RunOptions{
603                 CommitPulls: true,
604                 CommitTrash: true,
605                 ChunkPrefix: "ac", // catch "foo" but not "bar"
606                 Logger:      ctxlog.TestLogger(c),
607                 Dumper:      ctxlog.TestLogger(c),
608         }
609         s.stub.serveCurrentUserAdmin()
610         s.stub.serveFooBarFileCollections()
611         s.stub.serveKeepServices(stubServices)
612         s.stub.serveKeepstoreMounts()
613         s.stub.serveKeepstoreIndexFoo4Bar1()
614         trashReqs := s.stub.serveKeepstoreTrash()
615         pullReqs := s.stub.serveKeepstorePull()
616         srv := s.newServer(&opts)
617         bal, err := srv.runOnce(context.Background())
618         c.Check(err, check.IsNil)
619         c.Check(trashReqs.Count(), check.Equals, 8)
620         c.Check(pullReqs.Count(), check.Equals, 4)
621         // "foo" block is overreplicated by 2
622         c.Check(bal.stats.trashes, check.Equals, 2)
623         // "bar" block is underreplicated but does not match prefix
624         c.Check(bal.stats.pulls, check.Equals, 0)
625
626         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
627         c.Assert(err, check.IsNil)
628         c.Check(string(lost), check.Equals, "")
629 }
630
631 func (s *runSuite) TestRunForever(c *check.C) {
632         s.config.ManagementToken = "xyzzy"
633         opts := RunOptions{
634                 CommitPulls: true,
635                 CommitTrash: true,
636                 Logger:      ctxlog.TestLogger(c),
637                 Dumper:      ctxlog.TestLogger(c),
638         }
639         s.stub.serveCurrentUserAdmin()
640         s.stub.serveFooBarFileCollections()
641         s.stub.serveKeepServices(stubServices)
642         s.stub.serveKeepstoreMounts()
643         s.stub.serveKeepstoreIndexFoo4Bar1()
644         trashReqs := s.stub.serveKeepstoreTrash()
645         pullReqs := s.stub.serveKeepstorePull()
646
647         ctx, cancel := context.WithCancel(context.Background())
648         defer cancel()
649         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
650         srv := s.newServer(&opts)
651
652         done := make(chan bool)
653         go func() {
654                 srv.runForever(ctx)
655                 close(done)
656         }()
657
658         // Each run should send 4 pull lists + 4 trash lists. The
659         // first run should also send 4 empty trash lists at
660         // startup. We should complete all four runs in much less than
661         // a second.
662         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
663                 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
664                         break
665                 }
666                 time.Sleep(time.Millisecond)
667         }
668         cancel()
669         <-done
670         c.Check(pullReqs.Count() >= 16, check.Equals, true)
671         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
672
673         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
674         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
675 }