Merge branch '21201-doc-bundle-update'
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/config"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "github.com/jmoiron/sqlx"
25         "github.com/prometheus/client_golang/prometheus"
26         check "gopkg.in/check.v1"
27 )
28
29 var _ = check.Suite(&runSuite{})
30
31 type reqTracker struct {
32         reqs []http.Request
33         sync.Mutex
34 }
35
36 func (rt *reqTracker) Count() int {
37         rt.Lock()
38         defer rt.Unlock()
39         return len(rt.reqs)
40 }
41
42 func (rt *reqTracker) Add(req *http.Request) int {
43         rt.Lock()
44         defer rt.Unlock()
45         rt.reqs = append(rt.reqs, *req)
46         return len(rt.reqs)
47 }
48
49 var stubServices = []arvados.KeepService{
50         {
51                 UUID:           "zzzzz-bi6l4-000000000000000",
52                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
53                 ServicePort:    25107,
54                 ServiceSSLFlag: false,
55                 ServiceType:    "disk",
56         },
57         {
58                 UUID:           "zzzzz-bi6l4-000000000000001",
59                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
60                 ServicePort:    25107,
61                 ServiceSSLFlag: false,
62                 ServiceType:    "disk",
63         },
64         {
65                 UUID:           "zzzzz-bi6l4-000000000000002",
66                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
67                 ServicePort:    25107,
68                 ServiceSSLFlag: false,
69                 ServiceType:    "disk",
70         },
71         {
72                 UUID:           "zzzzz-bi6l4-000000000000003",
73                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
74                 ServicePort:    25107,
75                 ServiceSSLFlag: false,
76                 ServiceType:    "disk",
77         },
78         {
79                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
80                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
81                 ServicePort:    25333,
82                 ServiceSSLFlag: true,
83                 ServiceType:    "proxy",
84         },
85 }
86
87 var stubMounts = map[string][]arvados.KeepMount{
88         "keep0.zzzzz.arvadosapi.com:25107": {{
89                 UUID:           "zzzzz-ivpuk-000000000000000",
90                 DeviceID:       "keep0-vol0",
91                 StorageClasses: map[string]bool{"default": true},
92                 AllowWrite:     true,
93                 AllowTrash:     true,
94         }},
95         "keep1.zzzzz.arvadosapi.com:25107": {{
96                 UUID:           "zzzzz-ivpuk-100000000000000",
97                 DeviceID:       "keep1-vol0",
98                 StorageClasses: map[string]bool{"default": true},
99                 AllowWrite:     true,
100                 AllowTrash:     true,
101         }},
102         "keep2.zzzzz.arvadosapi.com:25107": {{
103                 UUID:           "zzzzz-ivpuk-200000000000000",
104                 DeviceID:       "keep2-vol0",
105                 StorageClasses: map[string]bool{"default": true},
106                 AllowWrite:     true,
107                 AllowTrash:     true,
108         }},
109         "keep3.zzzzz.arvadosapi.com:25107": {{
110                 UUID:           "zzzzz-ivpuk-300000000000000",
111                 DeviceID:       "keep3-vol0",
112                 StorageClasses: map[string]bool{"default": true},
113                 AllowWrite:     true,
114                 AllowTrash:     true,
115         }},
116 }
117
118 // stubServer is an HTTP transport that intercepts and processes all
119 // requests using its own handlers.
120 type stubServer struct {
121         mux      *http.ServeMux
122         srv      *httptest.Server
123         mutex    sync.Mutex
124         Requests reqTracker
125         logf     func(string, ...interface{})
126 }
127
128 // Start initializes the stub server and returns an *http.Client that
129 // uses the stub server to handle all requests.
130 //
131 // A stubServer that has been started should eventually be shut down
132 // with Close().
133 func (s *stubServer) Start() *http.Client {
134         // Set up a config.Client that forwards all requests to s.mux
135         // via s.srv. Test cases will attach handlers to s.mux to get
136         // the desired responses.
137         s.mux = http.NewServeMux()
138         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
139                 s.mutex.Lock()
140                 s.Requests.Add(r)
141                 s.mutex.Unlock()
142                 w.Header().Set("Content-Type", "application/json")
143                 s.mux.ServeHTTP(w, r)
144         }))
145         return &http.Client{Transport: s}
146 }
147
148 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
149         w := httptest.NewRecorder()
150         s.mux.ServeHTTP(w, req)
151         return &http.Response{
152                 StatusCode: w.Code,
153                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
154                 Header:     w.HeaderMap,
155                 Body:       ioutil.NopCloser(w.Body)}, nil
156 }
157
158 // Close releases resources used by the server.
159 func (s *stubServer) Close() {
160         s.srv.Close()
161 }
162
163 func (s *stubServer) serveStatic(path, data string) *reqTracker {
164         rt := &reqTracker{}
165         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
166                 rt.Add(r)
167                 if r.Body != nil {
168                         ioutil.ReadAll(r.Body)
169                         r.Body.Close()
170                 }
171                 io.WriteString(w, data)
172         })
173         return rt
174 }
175
176 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
177         return s.serveStatic("/arvados/v1/users/current",
178                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
179 }
180
181 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
182         return s.serveStatic("/arvados/v1/users/current",
183                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
184 }
185
186 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
187         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
188                 `{"defaultCollectionReplication":2}`)
189 }
190
191 func (s *stubServer) serveZeroCollections() *reqTracker {
192         return s.serveStatic("/arvados/v1/collections",
193                 `{"items":[],"items_available":0}`)
194 }
195
196 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
197         rt := &reqTracker{}
198         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
199                 r.ParseForm()
200                 rt.Add(r)
201                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
202                         io.WriteString(w, `{"items_available":0,"items":[]}`)
203                 } else {
204                         io.WriteString(w, `{"items_available":3,"items":[
205                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
206                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
208                 }
209         })
210         return rt
211 }
212
213 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
214         rt := &reqTracker{}
215         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
216                 r.ParseForm()
217                 rt.Add(r)
218                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
219                         io.WriteString(w, `{"items_available":3,"items":[]}`)
220                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
221                         io.WriteString(w, `{"items_available":0,"items":[]}`)
222                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
223                         io.WriteString(w, `{"items_available":0,"items":[]}`)
224                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
225                         io.WriteString(w, `{"items_available":0,"items":[]}`)
226                 } else {
227                         io.WriteString(w, `{"items_available":2,"items":[
228                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
229                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
230                 }
231         })
232         return rt
233 }
234
235 func (s *stubServer) serveZeroKeepServices() *reqTracker {
236         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
237 }
238
239 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
240         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
241                 ItemsAvailable: len(svcs),
242                 Items:          svcs,
243         })
244 }
245
246 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
247         rt := &reqTracker{}
248         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
249                 rt.Add(r)
250                 json.NewEncoder(w).Encode(resp)
251         })
252         return rt
253 }
254
255 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
256         rt := &reqTracker{}
257         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
258                 rt.Add(r)
259                 json.NewEncoder(w).Encode(stubMounts[r.Host])
260         })
261         return rt
262 }
263
264 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
265         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
266         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
267         rt := &reqTracker{}
268         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
269                 count := rt.Add(r)
270                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
271                         io.WriteString(w, barLine)
272                 }
273                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
274                         io.WriteString(w, fooLine(count))
275                 }
276                 io.WriteString(w, "\n")
277         })
278         for _, mounts := range stubMounts {
279                 for i, mnt := range mounts {
280                         i := i
281                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
282                                 count := rt.Add(r)
283                                 r.ParseForm()
284                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
285                                         io.WriteString(w, barLine)
286                                 }
287                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
288                                         io.WriteString(w, fooLine(count))
289                                 }
290                                 io.WriteString(w, "\n")
291                         })
292                 }
293         }
294         return rt
295 }
296
297 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
298         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
299         rt := &reqTracker{}
300         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
301                 rt.Add(r)
302                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
303                         io.WriteString(w, fooLine)
304                 }
305                 io.WriteString(w, "\n")
306         })
307         for _, mounts := range stubMounts {
308                 for i, mnt := range mounts {
309                         i := i
310                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
311                                 rt.Add(r)
312                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
313                                         io.WriteString(w, fooLine)
314                                 }
315                                 io.WriteString(w, "\n")
316                         })
317                 }
318         }
319         return rt
320 }
321
322 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
323         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
324         rt := &reqTracker{}
325         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
326                 rt.Add(r)
327                 io.WriteString(w, fooLine)
328                 io.WriteString(w, "\n")
329         })
330         for _, mounts := range stubMounts {
331                 for _, mnt := range mounts {
332                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
333                                 rt.Add(r)
334                                 io.WriteString(w, fooLine)
335                                 io.WriteString(w, "\n")
336                         })
337                 }
338         }
339         return rt
340 }
341
342 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
343         return s.serveStatic("/trash", `{}`)
344 }
345
346 func (s *stubServer) serveKeepstorePull() *reqTracker {
347         return s.serveStatic("/pull", `{}`)
348 }
349
350 type runSuite struct {
351         stub   stubServer
352         config *arvados.Cluster
353         db     *sqlx.DB
354         client *arvados.Client
355 }
356
357 func (s *runSuite) newServer(options *RunOptions) *Server {
358         srv := &Server{
359                 Cluster:    s.config,
360                 ArvClient:  s.client,
361                 RunOptions: *options,
362                 Metrics:    newMetrics(prometheus.NewRegistry()),
363                 Logger:     options.Logger,
364                 Dumper:     options.Dumper,
365                 DB:         s.db,
366         }
367         return srv
368 }
369
370 func (s *runSuite) SetUpTest(c *check.C) {
371         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
372         c.Assert(err, check.Equals, nil)
373         s.config, err = cfg.GetCluster("")
374         c.Assert(err, check.Equals, nil)
375         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
376         c.Assert(err, check.IsNil)
377
378         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
379         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
380
381         s.client = &arvados.Client{
382                 AuthToken: "xyzzy",
383                 APIHost:   "zzzzz.arvadosapi.com",
384                 Client:    s.stub.Start()}
385
386         s.stub.serveDiscoveryDoc()
387         s.stub.logf = c.Logf
388 }
389
390 func (s *runSuite) TearDownTest(c *check.C) {
391         s.stub.Close()
392 }
393
394 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
395         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
396         _, err := s.db.Exec(`delete from collections`)
397         c.Assert(err, check.IsNil)
398         opts := RunOptions{
399                 Logger: ctxlog.TestLogger(c),
400         }
401         s.stub.serveCurrentUserAdmin()
402         s.stub.serveZeroCollections()
403         s.stub.serveKeepServices(stubServices)
404         s.stub.serveKeepstoreMounts()
405         s.stub.serveKeepstoreIndexFoo4Bar1()
406         trashReqs := s.stub.serveKeepstoreTrash()
407         pullReqs := s.stub.serveKeepstorePull()
408         srv := s.newServer(&opts)
409         _, err = srv.runOnce(context.Background())
410         c.Check(err, check.ErrorMatches, "received zero collections")
411         c.Check(trashReqs.Count(), check.Equals, 4)
412         c.Check(pullReqs.Count(), check.Equals, 0)
413 }
414
415 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
416         opts := RunOptions{
417                 ChunkPrefix: "abc",
418                 Logger:      ctxlog.TestLogger(c),
419         }
420         s.stub.serveCurrentUserAdmin()
421         s.stub.serveFooBarFileCollections()
422         s.stub.serveKeepServices(stubServices)
423         s.stub.serveKeepstoreMounts()
424         s.stub.serveKeepstoreIndexIgnoringPrefix()
425         trashReqs := s.stub.serveKeepstoreTrash()
426         pullReqs := s.stub.serveKeepstorePull()
427         srv := s.newServer(&opts)
428         bal, err := srv.runOnce(context.Background())
429         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
430         c.Check(trashReqs.Count(), check.Equals, 4)
431         c.Check(pullReqs.Count(), check.Equals, 0)
432         c.Check(bal.stats.trashes, check.Equals, 0)
433         c.Check(bal.stats.pulls, check.Equals, 0)
434 }
435
436 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
437         opts := RunOptions{
438                 Logger: ctxlog.TestLogger(c),
439         }
440         s.stub.serveCurrentUserNotAdmin()
441         s.stub.serveZeroCollections()
442         s.stub.serveKeepServices(stubServices)
443         s.stub.serveKeepstoreMounts()
444         trashReqs := s.stub.serveKeepstoreTrash()
445         pullReqs := s.stub.serveKeepstorePull()
446         srv := s.newServer(&opts)
447         _, err := srv.runOnce(context.Background())
448         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
449         c.Check(trashReqs.Count(), check.Equals, 0)
450         c.Check(pullReqs.Count(), check.Equals, 0)
451 }
452
453 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
454         for _, trial := range []struct {
455                 prefix string
456                 errRe  string
457         }{
458                 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
459                 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
460                 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
461         } {
462                 s.SetUpTest(c)
463                 c.Logf("trying invalid prefix %q", trial.prefix)
464                 opts := RunOptions{
465                         ChunkPrefix: trial.prefix,
466                         Logger:      ctxlog.TestLogger(c),
467                 }
468                 s.stub.serveCurrentUserAdmin()
469                 s.stub.serveFooBarFileCollections()
470                 s.stub.serveKeepServices(stubServices)
471                 s.stub.serveKeepstoreMounts()
472                 trashReqs := s.stub.serveKeepstoreTrash()
473                 pullReqs := s.stub.serveKeepstorePull()
474                 srv := s.newServer(&opts)
475                 _, err := srv.runOnce(context.Background())
476                 c.Check(err, check.ErrorMatches, trial.errRe)
477                 c.Check(trashReqs.Count(), check.Equals, 0)
478                 c.Check(pullReqs.Count(), check.Equals, 0)
479         }
480 }
481
482 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
483         opts := RunOptions{
484                 Logger: ctxlog.TestLogger(c),
485         }
486         s.stub.serveCurrentUserAdmin()
487         s.stub.serveZeroCollections()
488         s.stub.serveKeepServices(stubServices)
489         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
490                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
491                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
492                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
493                         DeviceID:       "keep0-vol0",
494                         StorageClasses: map[string]bool{"default": true},
495                 }})
496         })
497         trashReqs := s.stub.serveKeepstoreTrash()
498         pullReqs := s.stub.serveKeepstorePull()
499         srv := s.newServer(&opts)
500         _, err := srv.runOnce(context.Background())
501         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
502         c.Check(trashReqs.Count(), check.Equals, 0)
503         c.Check(pullReqs.Count(), check.Equals, 0)
504 }
505
506 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
507         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
508         c.Assert(err, check.IsNil)
509         s.config.Collections.BlobMissingReport = lostf.Name()
510         defer os.Remove(lostf.Name())
511         opts := RunOptions{
512                 Logger: ctxlog.TestLogger(c),
513         }
514         s.stub.serveCurrentUserAdmin()
515         s.stub.serveFooBarFileCollections()
516         s.stub.serveKeepServices(stubServices)
517         s.stub.serveKeepstoreMounts()
518         s.stub.serveKeepstoreIndexFoo1()
519         s.stub.serveKeepstoreTrash()
520         s.stub.serveKeepstorePull()
521         srv := s.newServer(&opts)
522         c.Assert(err, check.IsNil)
523         _, err = srv.runOnce(context.Background())
524         c.Check(err, check.IsNil)
525         lost, err := ioutil.ReadFile(lostf.Name())
526         c.Assert(err, check.IsNil)
527         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
528 }
529
530 func (s *runSuite) TestDryRun(c *check.C) {
531         s.config.Collections.BalanceTrashLimit = 0
532         s.config.Collections.BalancePullLimit = 0
533         opts := RunOptions{
534                 Logger: ctxlog.TestLogger(c),
535         }
536         s.stub.serveCurrentUserAdmin()
537         collReqs := s.stub.serveFooBarFileCollections()
538         s.stub.serveKeepServices(stubServices)
539         s.stub.serveKeepstoreMounts()
540         s.stub.serveKeepstoreIndexFoo4Bar1()
541         trashReqs := s.stub.serveKeepstoreTrash()
542         pullReqs := s.stub.serveKeepstorePull()
543         srv := s.newServer(&opts)
544         bal, err := srv.runOnce(context.Background())
545         c.Check(err, check.IsNil)
546         for _, req := range collReqs.reqs {
547                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
548                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
549         }
550         c.Check(trashReqs.Count(), check.Equals, 0)
551         c.Check(pullReqs.Count(), check.Equals, 0)
552         c.Check(bal.stats.pulls, check.Equals, 0)
553         c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
554         c.Check(bal.stats.trashes, check.Equals, 0)
555         c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
556         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
557         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
558 }
559
560 func (s *runSuite) TestCommit(c *check.C) {
561         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
562         s.config.ManagementToken = "xyzzy"
563         opts := RunOptions{
564                 Logger: ctxlog.TestLogger(c),
565                 Dumper: ctxlog.TestLogger(c),
566         }
567         s.stub.serveCurrentUserAdmin()
568         s.stub.serveFooBarFileCollections()
569         s.stub.serveKeepServices(stubServices)
570         s.stub.serveKeepstoreMounts()
571         s.stub.serveKeepstoreIndexFoo4Bar1()
572         trashReqs := s.stub.serveKeepstoreTrash()
573         pullReqs := s.stub.serveKeepstorePull()
574         srv := s.newServer(&opts)
575         bal, err := srv.runOnce(context.Background())
576         c.Check(err, check.IsNil)
577         c.Check(trashReqs.Count(), check.Equals, 8)
578         c.Check(pullReqs.Count(), check.Equals, 4)
579         // "foo" block is overreplicated by 2
580         c.Check(bal.stats.trashes, check.Equals, 2)
581         // "bar" block is underreplicated by 1, and its only copy is
582         // in a poor rendezvous position
583         c.Check(bal.stats.pulls, check.Equals, 2)
584
585         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
586         c.Assert(err, check.IsNil)
587         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
588
589         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
590         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
591         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
592         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
593         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
594         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
595 }
596
597 func (s *runSuite) TestChunkPrefix(c *check.C) {
598         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
599         opts := RunOptions{
600                 ChunkPrefix: "ac", // catch "foo" but not "bar"
601                 Logger:      ctxlog.TestLogger(c),
602                 Dumper:      ctxlog.TestLogger(c),
603         }
604         s.stub.serveCurrentUserAdmin()
605         s.stub.serveFooBarFileCollections()
606         s.stub.serveKeepServices(stubServices)
607         s.stub.serveKeepstoreMounts()
608         s.stub.serveKeepstoreIndexFoo4Bar1()
609         trashReqs := s.stub.serveKeepstoreTrash()
610         pullReqs := s.stub.serveKeepstorePull()
611         srv := s.newServer(&opts)
612         bal, err := srv.runOnce(context.Background())
613         c.Check(err, check.IsNil)
614         c.Check(trashReqs.Count(), check.Equals, 8)
615         c.Check(pullReqs.Count(), check.Equals, 4)
616         // "foo" block is overreplicated by 2
617         c.Check(bal.stats.trashes, check.Equals, 2)
618         // "bar" block is underreplicated but does not match prefix
619         c.Check(bal.stats.pulls, check.Equals, 0)
620
621         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
622         c.Assert(err, check.IsNil)
623         c.Check(string(lost), check.Equals, "")
624 }
625
626 func (s *runSuite) TestRunForever(c *check.C) {
627         s.config.ManagementToken = "xyzzy"
628         opts := RunOptions{
629                 Logger: ctxlog.TestLogger(c),
630                 Dumper: ctxlog.TestLogger(c),
631         }
632         s.stub.serveCurrentUserAdmin()
633         s.stub.serveFooBarFileCollections()
634         s.stub.serveKeepServices(stubServices)
635         s.stub.serveKeepstoreMounts()
636         s.stub.serveKeepstoreIndexFoo4Bar1()
637         trashReqs := s.stub.serveKeepstoreTrash()
638         pullReqs := s.stub.serveKeepstorePull()
639
640         ctx, cancel := context.WithCancel(context.Background())
641         defer cancel()
642         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
643         srv := s.newServer(&opts)
644
645         done := make(chan bool)
646         go func() {
647                 srv.runForever(ctx)
648                 close(done)
649         }()
650
651         // Each run should send 4 pull lists + 4 trash lists. The
652         // first run should also send 4 empty trash lists at
653         // startup. We should complete all four runs in much less than
654         // a second.
655         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
656                 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
657                         break
658                 }
659                 time.Sleep(time.Millisecond)
660         }
661         cancel()
662         <-done
663         c.Check(pullReqs.Count() >= 16, check.Equals, true)
664         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
665
666         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
667         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
668 }