Merge branch '21041-installer-from-HEAD'. Closes #21041
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/config"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "github.com/jmoiron/sqlx"
25         "github.com/prometheus/client_golang/prometheus"
26         check "gopkg.in/check.v1"
27 )
28
29 var _ = check.Suite(&runSuite{})
30
31 type reqTracker struct {
32         reqs []http.Request
33         sync.Mutex
34 }
35
36 func (rt *reqTracker) Count() int {
37         rt.Lock()
38         defer rt.Unlock()
39         return len(rt.reqs)
40 }
41
42 func (rt *reqTracker) Add(req *http.Request) int {
43         rt.Lock()
44         defer rt.Unlock()
45         rt.reqs = append(rt.reqs, *req)
46         return len(rt.reqs)
47 }
48
49 var stubServices = []arvados.KeepService{
50         {
51                 UUID:           "zzzzz-bi6l4-000000000000000",
52                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
53                 ServicePort:    25107,
54                 ServiceSSLFlag: false,
55                 ServiceType:    "disk",
56         },
57         {
58                 UUID:           "zzzzz-bi6l4-000000000000001",
59                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
60                 ServicePort:    25107,
61                 ServiceSSLFlag: false,
62                 ServiceType:    "disk",
63         },
64         {
65                 UUID:           "zzzzz-bi6l4-000000000000002",
66                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
67                 ServicePort:    25107,
68                 ServiceSSLFlag: false,
69                 ServiceType:    "disk",
70         },
71         {
72                 UUID:           "zzzzz-bi6l4-000000000000003",
73                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
74                 ServicePort:    25107,
75                 ServiceSSLFlag: false,
76                 ServiceType:    "disk",
77         },
78         {
79                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
80                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
81                 ServicePort:    25333,
82                 ServiceSSLFlag: true,
83                 ServiceType:    "proxy",
84         },
85 }
86
87 var stubMounts = map[string][]arvados.KeepMount{
88         "keep0.zzzzz.arvadosapi.com:25107": {{
89                 UUID:           "zzzzz-ivpuk-000000000000000",
90                 DeviceID:       "keep0-vol0",
91                 StorageClasses: map[string]bool{"default": true},
92                 AllowWrite:     true,
93                 AllowTrash:     true,
94         }},
95         "keep1.zzzzz.arvadosapi.com:25107": {{
96                 UUID:           "zzzzz-ivpuk-100000000000000",
97                 DeviceID:       "keep1-vol0",
98                 StorageClasses: map[string]bool{"default": true},
99                 AllowWrite:     true,
100                 AllowTrash:     true,
101         }},
102         "keep2.zzzzz.arvadosapi.com:25107": {{
103                 UUID:           "zzzzz-ivpuk-200000000000000",
104                 DeviceID:       "keep2-vol0",
105                 StorageClasses: map[string]bool{"default": true},
106                 AllowWrite:     true,
107                 AllowTrash:     true,
108         }},
109         "keep3.zzzzz.arvadosapi.com:25107": {{
110                 UUID:           "zzzzz-ivpuk-300000000000000",
111                 DeviceID:       "keep3-vol0",
112                 StorageClasses: map[string]bool{"default": true},
113                 AllowWrite:     true,
114                 AllowTrash:     true,
115         }},
116 }
117
118 // stubServer is an HTTP transport that intercepts and processes all
119 // requests using its own handlers.
120 type stubServer struct {
121         mux      *http.ServeMux
122         srv      *httptest.Server
123         mutex    sync.Mutex
124         Requests reqTracker
125         logf     func(string, ...interface{})
126 }
127
128 // Start initializes the stub server and returns an *http.Client that
129 // uses the stub server to handle all requests.
130 //
131 // A stubServer that has been started should eventually be shut down
132 // with Close().
133 func (s *stubServer) Start() *http.Client {
134         // Set up a config.Client that forwards all requests to s.mux
135         // via s.srv. Test cases will attach handlers to s.mux to get
136         // the desired responses.
137         s.mux = http.NewServeMux()
138         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
139                 s.mutex.Lock()
140                 s.Requests.Add(r)
141                 s.mutex.Unlock()
142                 w.Header().Set("Content-Type", "application/json")
143                 s.mux.ServeHTTP(w, r)
144         }))
145         return &http.Client{Transport: s}
146 }
147
148 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
149         w := httptest.NewRecorder()
150         s.mux.ServeHTTP(w, req)
151         return &http.Response{
152                 StatusCode: w.Code,
153                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
154                 Header:     w.HeaderMap,
155                 Body:       ioutil.NopCloser(w.Body)}, nil
156 }
157
158 // Close releases resources used by the server.
159 func (s *stubServer) Close() {
160         s.srv.Close()
161 }
162
163 func (s *stubServer) serveStatic(path, data string) *reqTracker {
164         rt := &reqTracker{}
165         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
166                 rt.Add(r)
167                 if r.Body != nil {
168                         ioutil.ReadAll(r.Body)
169                         r.Body.Close()
170                 }
171                 io.WriteString(w, data)
172         })
173         return rt
174 }
175
176 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
177         return s.serveStatic("/arvados/v1/users/current",
178                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
179 }
180
181 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
182         return s.serveStatic("/arvados/v1/users/current",
183                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
184 }
185
186 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
187         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
188                 `{"defaultCollectionReplication":2}`)
189 }
190
191 func (s *stubServer) serveZeroCollections() *reqTracker {
192         return s.serveStatic("/arvados/v1/collections",
193                 `{"items":[],"items_available":0}`)
194 }
195
196 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
197         rt := &reqTracker{}
198         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
199                 r.ParseForm()
200                 rt.Add(r)
201                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
202                         io.WriteString(w, `{"items_available":0,"items":[]}`)
203                 } else {
204                         io.WriteString(w, `{"items_available":3,"items":[
205                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
206                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
208                 }
209         })
210         return rt
211 }
212
213 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
214         rt := &reqTracker{}
215         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
216                 r.ParseForm()
217                 rt.Add(r)
218                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
219                         io.WriteString(w, `{"items_available":3,"items":[]}`)
220                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
221                         io.WriteString(w, `{"items_available":0,"items":[]}`)
222                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
223                         io.WriteString(w, `{"items_available":0,"items":[]}`)
224                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
225                         io.WriteString(w, `{"items_available":0,"items":[]}`)
226                 } else {
227                         io.WriteString(w, `{"items_available":2,"items":[
228                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
229                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
230                 }
231         })
232         return rt
233 }
234
235 func (s *stubServer) serveZeroKeepServices() *reqTracker {
236         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
237 }
238
239 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
240         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
241                 ItemsAvailable: len(svcs),
242                 Items:          svcs,
243         })
244 }
245
246 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
247         rt := &reqTracker{}
248         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
249                 rt.Add(r)
250                 json.NewEncoder(w).Encode(resp)
251         })
252         return rt
253 }
254
255 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
256         rt := &reqTracker{}
257         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
258                 rt.Add(r)
259                 json.NewEncoder(w).Encode(stubMounts[r.Host])
260         })
261         return rt
262 }
263
264 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
265         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
266         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
267         rt := &reqTracker{}
268         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
269                 count := rt.Add(r)
270                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
271                         io.WriteString(w, barLine)
272                 }
273                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
274                         io.WriteString(w, fooLine(count))
275                 }
276                 io.WriteString(w, "\n")
277         })
278         for _, mounts := range stubMounts {
279                 for i, mnt := range mounts {
280                         i := i
281                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
282                                 count := rt.Add(r)
283                                 r.ParseForm()
284                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
285                                         io.WriteString(w, barLine)
286                                 }
287                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
288                                         io.WriteString(w, fooLine(count))
289                                 }
290                                 io.WriteString(w, "\n")
291                         })
292                 }
293         }
294         return rt
295 }
296
297 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
298         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
299         rt := &reqTracker{}
300         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
301                 rt.Add(r)
302                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
303                         io.WriteString(w, fooLine)
304                 }
305                 io.WriteString(w, "\n")
306         })
307         for _, mounts := range stubMounts {
308                 for i, mnt := range mounts {
309                         i := i
310                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
311                                 rt.Add(r)
312                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
313                                         io.WriteString(w, fooLine)
314                                 }
315                                 io.WriteString(w, "\n")
316                         })
317                 }
318         }
319         return rt
320 }
321
322 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
323         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
324         rt := &reqTracker{}
325         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
326                 rt.Add(r)
327                 io.WriteString(w, fooLine)
328                 io.WriteString(w, "\n")
329         })
330         for _, mounts := range stubMounts {
331                 for _, mnt := range mounts {
332                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
333                                 rt.Add(r)
334                                 io.WriteString(w, fooLine)
335                                 io.WriteString(w, "\n")
336                         })
337                 }
338         }
339         return rt
340 }
341
342 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
343         return s.serveStatic("/trash", `{}`)
344 }
345
346 func (s *stubServer) serveKeepstorePull() *reqTracker {
347         return s.serveStatic("/pull", `{}`)
348 }
349
350 type runSuite struct {
351         stub   stubServer
352         config *arvados.Cluster
353         db     *sqlx.DB
354         client *arvados.Client
355 }
356
357 func (s *runSuite) newServer(options *RunOptions) *Server {
358         srv := &Server{
359                 Cluster:    s.config,
360                 ArvClient:  s.client,
361                 RunOptions: *options,
362                 Metrics:    newMetrics(prometheus.NewRegistry()),
363                 Logger:     options.Logger,
364                 Dumper:     options.Dumper,
365                 DB:         s.db,
366         }
367         return srv
368 }
369
370 func (s *runSuite) SetUpTest(c *check.C) {
371         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
372         c.Assert(err, check.Equals, nil)
373         s.config, err = cfg.GetCluster("")
374         c.Assert(err, check.Equals, nil)
375         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
376         c.Assert(err, check.IsNil)
377
378         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
379         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
380
381         s.client = &arvados.Client{
382                 AuthToken: "xyzzy",
383                 APIHost:   "zzzzz.arvadosapi.com",
384                 Client:    s.stub.Start()}
385
386         s.stub.serveDiscoveryDoc()
387         s.stub.logf = c.Logf
388 }
389
390 func (s *runSuite) TearDownTest(c *check.C) {
391         s.stub.Close()
392 }
393
394 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
395         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
396         _, err := s.db.Exec(`delete from collections`)
397         c.Assert(err, check.IsNil)
398         opts := RunOptions{
399                 CommitPulls: true,
400                 CommitTrash: true,
401                 Logger:      ctxlog.TestLogger(c),
402         }
403         s.stub.serveCurrentUserAdmin()
404         s.stub.serveZeroCollections()
405         s.stub.serveKeepServices(stubServices)
406         s.stub.serveKeepstoreMounts()
407         s.stub.serveKeepstoreIndexFoo4Bar1()
408         trashReqs := s.stub.serveKeepstoreTrash()
409         pullReqs := s.stub.serveKeepstorePull()
410         srv := s.newServer(&opts)
411         _, err = srv.runOnce(context.Background())
412         c.Check(err, check.ErrorMatches, "received zero collections")
413         c.Check(trashReqs.Count(), check.Equals, 4)
414         c.Check(pullReqs.Count(), check.Equals, 0)
415 }
416
417 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
418         opts := RunOptions{
419                 CommitPulls: true,
420                 CommitTrash: true,
421                 ChunkPrefix: "abc",
422                 Logger:      ctxlog.TestLogger(c),
423         }
424         s.stub.serveCurrentUserAdmin()
425         s.stub.serveFooBarFileCollections()
426         s.stub.serveKeepServices(stubServices)
427         s.stub.serveKeepstoreMounts()
428         s.stub.serveKeepstoreIndexIgnoringPrefix()
429         trashReqs := s.stub.serveKeepstoreTrash()
430         pullReqs := s.stub.serveKeepstorePull()
431         srv := s.newServer(&opts)
432         bal, err := srv.runOnce(context.Background())
433         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
434         c.Check(trashReqs.Count(), check.Equals, 4)
435         c.Check(pullReqs.Count(), check.Equals, 0)
436         c.Check(bal.stats.trashes, check.Equals, 0)
437         c.Check(bal.stats.pulls, check.Equals, 0)
438 }
439
440 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
441         opts := RunOptions{
442                 CommitPulls: true,
443                 CommitTrash: true,
444                 Logger:      ctxlog.TestLogger(c),
445         }
446         s.stub.serveCurrentUserNotAdmin()
447         s.stub.serveZeroCollections()
448         s.stub.serveKeepServices(stubServices)
449         s.stub.serveKeepstoreMounts()
450         trashReqs := s.stub.serveKeepstoreTrash()
451         pullReqs := s.stub.serveKeepstorePull()
452         srv := s.newServer(&opts)
453         _, err := srv.runOnce(context.Background())
454         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
455         c.Check(trashReqs.Count(), check.Equals, 0)
456         c.Check(pullReqs.Count(), check.Equals, 0)
457 }
458
459 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
460         for _, trial := range []struct {
461                 prefix string
462                 errRe  string
463         }{
464                 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
465                 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
466                 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
467         } {
468                 s.SetUpTest(c)
469                 c.Logf("trying invalid prefix %q", trial.prefix)
470                 opts := RunOptions{
471                         CommitPulls: true,
472                         CommitTrash: true,
473                         ChunkPrefix: trial.prefix,
474                         Logger:      ctxlog.TestLogger(c),
475                 }
476                 s.stub.serveCurrentUserAdmin()
477                 s.stub.serveFooBarFileCollections()
478                 s.stub.serveKeepServices(stubServices)
479                 s.stub.serveKeepstoreMounts()
480                 trashReqs := s.stub.serveKeepstoreTrash()
481                 pullReqs := s.stub.serveKeepstorePull()
482                 srv := s.newServer(&opts)
483                 _, err := srv.runOnce(context.Background())
484                 c.Check(err, check.ErrorMatches, trial.errRe)
485                 c.Check(trashReqs.Count(), check.Equals, 0)
486                 c.Check(pullReqs.Count(), check.Equals, 0)
487         }
488 }
489
490 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
491         opts := RunOptions{
492                 CommitPulls: true,
493                 CommitTrash: true,
494                 Logger:      ctxlog.TestLogger(c),
495         }
496         s.stub.serveCurrentUserAdmin()
497         s.stub.serveZeroCollections()
498         s.stub.serveKeepServices(stubServices)
499         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
500                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
501                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
502                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
503                         DeviceID:       "keep0-vol0",
504                         StorageClasses: map[string]bool{"default": true},
505                 }})
506         })
507         trashReqs := s.stub.serveKeepstoreTrash()
508         pullReqs := s.stub.serveKeepstorePull()
509         srv := s.newServer(&opts)
510         _, err := srv.runOnce(context.Background())
511         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
512         c.Check(trashReqs.Count(), check.Equals, 0)
513         c.Check(pullReqs.Count(), check.Equals, 0)
514 }
515
516 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
517         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
518         c.Assert(err, check.IsNil)
519         s.config.Collections.BlobMissingReport = lostf.Name()
520         defer os.Remove(lostf.Name())
521         opts := RunOptions{
522                 CommitPulls: true,
523                 CommitTrash: true,
524                 Logger:      ctxlog.TestLogger(c),
525         }
526         s.stub.serveCurrentUserAdmin()
527         s.stub.serveFooBarFileCollections()
528         s.stub.serveKeepServices(stubServices)
529         s.stub.serveKeepstoreMounts()
530         s.stub.serveKeepstoreIndexFoo1()
531         s.stub.serveKeepstoreTrash()
532         s.stub.serveKeepstorePull()
533         srv := s.newServer(&opts)
534         c.Assert(err, check.IsNil)
535         _, err = srv.runOnce(context.Background())
536         c.Check(err, check.IsNil)
537         lost, err := ioutil.ReadFile(lostf.Name())
538         c.Assert(err, check.IsNil)
539         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
540 }
541
542 func (s *runSuite) TestDryRun(c *check.C) {
543         opts := RunOptions{
544                 CommitPulls: false,
545                 CommitTrash: false,
546                 Logger:      ctxlog.TestLogger(c),
547         }
548         s.stub.serveCurrentUserAdmin()
549         collReqs := s.stub.serveFooBarFileCollections()
550         s.stub.serveKeepServices(stubServices)
551         s.stub.serveKeepstoreMounts()
552         s.stub.serveKeepstoreIndexFoo4Bar1()
553         trashReqs := s.stub.serveKeepstoreTrash()
554         pullReqs := s.stub.serveKeepstorePull()
555         srv := s.newServer(&opts)
556         bal, err := srv.runOnce(context.Background())
557         c.Check(err, check.IsNil)
558         for _, req := range collReqs.reqs {
559                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
560                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
561         }
562         c.Check(trashReqs.Count(), check.Equals, 0)
563         c.Check(pullReqs.Count(), check.Equals, 0)
564         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
565         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
566         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
567 }
568
569 func (s *runSuite) TestCommit(c *check.C) {
570         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
571         s.config.ManagementToken = "xyzzy"
572         opts := RunOptions{
573                 CommitPulls: true,
574                 CommitTrash: true,
575                 Logger:      ctxlog.TestLogger(c),
576                 Dumper:      ctxlog.TestLogger(c),
577         }
578         s.stub.serveCurrentUserAdmin()
579         s.stub.serveFooBarFileCollections()
580         s.stub.serveKeepServices(stubServices)
581         s.stub.serveKeepstoreMounts()
582         s.stub.serveKeepstoreIndexFoo4Bar1()
583         trashReqs := s.stub.serveKeepstoreTrash()
584         pullReqs := s.stub.serveKeepstorePull()
585         srv := s.newServer(&opts)
586         bal, err := srv.runOnce(context.Background())
587         c.Check(err, check.IsNil)
588         c.Check(trashReqs.Count(), check.Equals, 8)
589         c.Check(pullReqs.Count(), check.Equals, 4)
590         // "foo" block is overreplicated by 2
591         c.Check(bal.stats.trashes, check.Equals, 2)
592         // "bar" block is underreplicated by 1, and its only copy is
593         // in a poor rendezvous position
594         c.Check(bal.stats.pulls, check.Equals, 2)
595
596         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
597         c.Assert(err, check.IsNil)
598         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
599
600         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
601         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
602         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
603         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
604         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
605         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
606 }
607
608 func (s *runSuite) TestChunkPrefix(c *check.C) {
609         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
610         opts := RunOptions{
611                 CommitPulls: true,
612                 CommitTrash: true,
613                 ChunkPrefix: "ac", // catch "foo" but not "bar"
614                 Logger:      ctxlog.TestLogger(c),
615                 Dumper:      ctxlog.TestLogger(c),
616         }
617         s.stub.serveCurrentUserAdmin()
618         s.stub.serveFooBarFileCollections()
619         s.stub.serveKeepServices(stubServices)
620         s.stub.serveKeepstoreMounts()
621         s.stub.serveKeepstoreIndexFoo4Bar1()
622         trashReqs := s.stub.serveKeepstoreTrash()
623         pullReqs := s.stub.serveKeepstorePull()
624         srv := s.newServer(&opts)
625         bal, err := srv.runOnce(context.Background())
626         c.Check(err, check.IsNil)
627         c.Check(trashReqs.Count(), check.Equals, 8)
628         c.Check(pullReqs.Count(), check.Equals, 4)
629         // "foo" block is overreplicated by 2
630         c.Check(bal.stats.trashes, check.Equals, 2)
631         // "bar" block is underreplicated but does not match prefix
632         c.Check(bal.stats.pulls, check.Equals, 0)
633
634         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
635         c.Assert(err, check.IsNil)
636         c.Check(string(lost), check.Equals, "")
637 }
638
639 func (s *runSuite) TestRunForever(c *check.C) {
640         s.config.ManagementToken = "xyzzy"
641         opts := RunOptions{
642                 CommitPulls: true,
643                 CommitTrash: true,
644                 Logger:      ctxlog.TestLogger(c),
645                 Dumper:      ctxlog.TestLogger(c),
646         }
647         s.stub.serveCurrentUserAdmin()
648         s.stub.serveFooBarFileCollections()
649         s.stub.serveKeepServices(stubServices)
650         s.stub.serveKeepstoreMounts()
651         s.stub.serveKeepstoreIndexFoo4Bar1()
652         trashReqs := s.stub.serveKeepstoreTrash()
653         pullReqs := s.stub.serveKeepstorePull()
654
655         ctx, cancel := context.WithCancel(context.Background())
656         defer cancel()
657         s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
658         srv := s.newServer(&opts)
659
660         done := make(chan bool)
661         go func() {
662                 srv.runForever(ctx)
663                 close(done)
664         }()
665
666         // Each run should send 4 pull lists + 4 trash lists. The
667         // first run should also send 4 empty trash lists at
668         // startup. We should complete all four runs in much less than
669         // a second.
670         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
671                 if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
672                         break
673                 }
674                 time.Sleep(time.Millisecond)
675         }
676         cancel()
677         <-done
678         c.Check(pullReqs.Count() >= 16, check.Equals, true)
679         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
680
681         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
682         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
683 }