21254: Fix racy keep-balance test.
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/jmoiron/sqlx"
26         "github.com/prometheus/client_golang/prometheus"
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&runSuite{})
31
32 type reqTracker struct {
33         reqs []http.Request
34         sync.Mutex
35 }
36
37 func (rt *reqTracker) Count() int {
38         rt.Lock()
39         defer rt.Unlock()
40         return len(rt.reqs)
41 }
42
43 func (rt *reqTracker) Add(req *http.Request) int {
44         rt.Lock()
45         defer rt.Unlock()
46         rt.reqs = append(rt.reqs, *req)
47         return len(rt.reqs)
48 }
49
50 var stubServices = []arvados.KeepService{
51         {
52                 UUID:           "zzzzz-bi6l4-000000000000000",
53                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
54                 ServicePort:    25107,
55                 ServiceSSLFlag: false,
56                 ServiceType:    "disk",
57         },
58         {
59                 UUID:           "zzzzz-bi6l4-000000000000001",
60                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
61                 ServicePort:    25107,
62                 ServiceSSLFlag: false,
63                 ServiceType:    "disk",
64         },
65         {
66                 UUID:           "zzzzz-bi6l4-000000000000002",
67                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
68                 ServicePort:    25107,
69                 ServiceSSLFlag: false,
70                 ServiceType:    "disk",
71         },
72         {
73                 UUID:           "zzzzz-bi6l4-000000000000003",
74                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
75                 ServicePort:    25107,
76                 ServiceSSLFlag: false,
77                 ServiceType:    "disk",
78         },
79         {
80                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
81                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
82                 ServicePort:    25333,
83                 ServiceSSLFlag: true,
84                 ServiceType:    "proxy",
85         },
86 }
87
88 var stubMounts = map[string][]arvados.KeepMount{
89         "keep0.zzzzz.arvadosapi.com:25107": {{
90                 UUID:           "zzzzz-ivpuk-000000000000000",
91                 DeviceID:       "keep0-vol0",
92                 StorageClasses: map[string]bool{"default": true},
93                 AllowWrite:     true,
94                 AllowTrash:     true,
95         }},
96         "keep1.zzzzz.arvadosapi.com:25107": {{
97                 UUID:           "zzzzz-ivpuk-100000000000000",
98                 DeviceID:       "keep1-vol0",
99                 StorageClasses: map[string]bool{"default": true},
100                 AllowWrite:     true,
101                 AllowTrash:     true,
102         }},
103         "keep2.zzzzz.arvadosapi.com:25107": {{
104                 UUID:           "zzzzz-ivpuk-200000000000000",
105                 DeviceID:       "keep2-vol0",
106                 StorageClasses: map[string]bool{"default": true},
107                 AllowWrite:     true,
108                 AllowTrash:     true,
109         }},
110         "keep3.zzzzz.arvadosapi.com:25107": {{
111                 UUID:           "zzzzz-ivpuk-300000000000000",
112                 DeviceID:       "keep3-vol0",
113                 StorageClasses: map[string]bool{"default": true},
114                 AllowWrite:     true,
115                 AllowTrash:     true,
116         }},
117 }
118
119 // stubServer is an HTTP transport that intercepts and processes all
120 // requests using its own handlers.
121 type stubServer struct {
122         mux      *http.ServeMux
123         srv      *httptest.Server
124         mutex    sync.Mutex
125         Requests reqTracker
126         logf     func(string, ...interface{})
127 }
128
129 // Start initializes the stub server and returns an *http.Client that
130 // uses the stub server to handle all requests.
131 //
132 // A stubServer that has been started should eventually be shut down
133 // with Close().
134 func (s *stubServer) Start() *http.Client {
135         // Set up a config.Client that forwards all requests to s.mux
136         // via s.srv. Test cases will attach handlers to s.mux to get
137         // the desired responses.
138         s.mux = http.NewServeMux()
139         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
140                 s.mutex.Lock()
141                 s.Requests.Add(r)
142                 s.mutex.Unlock()
143                 w.Header().Set("Content-Type", "application/json")
144                 s.mux.ServeHTTP(w, r)
145         }))
146         return &http.Client{Transport: s}
147 }
148
149 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
150         w := httptest.NewRecorder()
151         s.mux.ServeHTTP(w, req)
152         return &http.Response{
153                 StatusCode: w.Code,
154                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
155                 Header:     w.HeaderMap,
156                 Body:       ioutil.NopCloser(w.Body)}, nil
157 }
158
159 // Close releases resources used by the server.
160 func (s *stubServer) Close() {
161         s.srv.Close()
162 }
163
164 func (s *stubServer) serveStatic(path, data string) *reqTracker {
165         rt := &reqTracker{}
166         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
167                 rt.Add(r)
168                 if r.Body != nil {
169                         ioutil.ReadAll(r.Body)
170                         r.Body.Close()
171                 }
172                 io.WriteString(w, data)
173         })
174         return rt
175 }
176
177 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
178         return s.serveStatic("/arvados/v1/users/current",
179                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
180 }
181
182 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
183         return s.serveStatic("/arvados/v1/users/current",
184                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
185 }
186
187 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
188         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
189                 `{"defaultCollectionReplication":2}`)
190 }
191
192 func (s *stubServer) serveZeroCollections() *reqTracker {
193         return s.serveStatic("/arvados/v1/collections",
194                 `{"items":[],"items_available":0}`)
195 }
196
197 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
198         rt := &reqTracker{}
199         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
200                 r.ParseForm()
201                 rt.Add(r)
202                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
203                         io.WriteString(w, `{"items_available":0,"items":[]}`)
204                 } else {
205                         io.WriteString(w, `{"items_available":3,"items":[
206                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
208                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
209                 }
210         })
211         return rt
212 }
213
214 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
215         rt := &reqTracker{}
216         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
217                 r.ParseForm()
218                 rt.Add(r)
219                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
220                         io.WriteString(w, `{"items_available":3,"items":[]}`)
221                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
222                         io.WriteString(w, `{"items_available":0,"items":[]}`)
223                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
224                         io.WriteString(w, `{"items_available":0,"items":[]}`)
225                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
226                         io.WriteString(w, `{"items_available":0,"items":[]}`)
227                 } else {
228                         io.WriteString(w, `{"items_available":2,"items":[
229                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
230                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
231                 }
232         })
233         return rt
234 }
235
236 func (s *stubServer) serveZeroKeepServices() *reqTracker {
237         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
238 }
239
240 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
241         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
242                 ItemsAvailable: len(svcs),
243                 Items:          svcs,
244         })
245 }
246
247 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
248         rt := &reqTracker{}
249         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
250                 rt.Add(r)
251                 json.NewEncoder(w).Encode(resp)
252         })
253         return rt
254 }
255
256 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
257         rt := &reqTracker{}
258         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
259                 rt.Add(r)
260                 json.NewEncoder(w).Encode(stubMounts[r.Host])
261         })
262         return rt
263 }
264
265 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
266         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
267         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
268         rt := &reqTracker{}
269         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
270                 count := rt.Add(r)
271                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
272                         io.WriteString(w, barLine)
273                 }
274                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
275                         io.WriteString(w, fooLine(count))
276                 }
277                 io.WriteString(w, "\n")
278         })
279         for _, mounts := range stubMounts {
280                 for i, mnt := range mounts {
281                         i := i
282                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
283                                 count := rt.Add(r)
284                                 r.ParseForm()
285                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
286                                         io.WriteString(w, barLine)
287                                 }
288                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
289                                         io.WriteString(w, fooLine(count))
290                                 }
291                                 io.WriteString(w, "\n")
292                         })
293                 }
294         }
295         return rt
296 }
297
298 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
299         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
300         rt := &reqTracker{}
301         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
302                 rt.Add(r)
303                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
304                         io.WriteString(w, fooLine)
305                 }
306                 io.WriteString(w, "\n")
307         })
308         for _, mounts := range stubMounts {
309                 for i, mnt := range mounts {
310                         i := i
311                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
312                                 rt.Add(r)
313                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
314                                         io.WriteString(w, fooLine)
315                                 }
316                                 io.WriteString(w, "\n")
317                         })
318                 }
319         }
320         return rt
321 }
322
323 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
324         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
325         rt := &reqTracker{}
326         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
327                 rt.Add(r)
328                 io.WriteString(w, fooLine)
329                 io.WriteString(w, "\n")
330         })
331         for _, mounts := range stubMounts {
332                 for _, mnt := range mounts {
333                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
334                                 rt.Add(r)
335                                 io.WriteString(w, fooLine)
336                                 io.WriteString(w, "\n")
337                         })
338                 }
339         }
340         return rt
341 }
342
343 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
344         return s.serveStatic("/trash", `{}`)
345 }
346
347 func (s *stubServer) serveKeepstorePull() *reqTracker {
348         return s.serveStatic("/pull", `{}`)
349 }
350
351 type runSuite struct {
352         stub   stubServer
353         config *arvados.Cluster
354         db     *sqlx.DB
355         client *arvados.Client
356 }
357
358 func (s *runSuite) newServer(options *RunOptions) *Server {
359         srv := &Server{
360                 Cluster:    s.config,
361                 ArvClient:  s.client,
362                 RunOptions: *options,
363                 Metrics:    newMetrics(prometheus.NewRegistry()),
364                 Logger:     options.Logger,
365                 Dumper:     options.Dumper,
366                 DB:         s.db,
367         }
368         return srv
369 }
370
371 func (s *runSuite) SetUpTest(c *check.C) {
372         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
373         c.Assert(err, check.Equals, nil)
374         s.config, err = cfg.GetCluster("")
375         c.Assert(err, check.Equals, nil)
376         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
377         c.Assert(err, check.IsNil)
378
379         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
380         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
381
382         s.client = &arvados.Client{
383                 AuthToken: "xyzzy",
384                 APIHost:   "zzzzz.arvadosapi.com",
385                 Client:    s.stub.Start()}
386
387         s.stub.serveDiscoveryDoc()
388         s.stub.logf = c.Logf
389 }
390
391 func (s *runSuite) TearDownTest(c *check.C) {
392         s.stub.Close()
393 }
394
395 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
396         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
397         _, err := s.db.Exec(`delete from collections`)
398         c.Assert(err, check.IsNil)
399         opts := RunOptions{
400                 Logger: ctxlog.TestLogger(c),
401         }
402         s.stub.serveCurrentUserAdmin()
403         s.stub.serveZeroCollections()
404         s.stub.serveKeepServices(stubServices)
405         s.stub.serveKeepstoreMounts()
406         s.stub.serveKeepstoreIndexFoo4Bar1()
407         trashReqs := s.stub.serveKeepstoreTrash()
408         pullReqs := s.stub.serveKeepstorePull()
409         srv := s.newServer(&opts)
410         _, err = srv.runOnce(context.Background())
411         c.Check(err, check.ErrorMatches, "received zero collections")
412         c.Check(trashReqs.Count(), check.Equals, 4)
413         c.Check(pullReqs.Count(), check.Equals, 0)
414 }
415
416 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
417         opts := RunOptions{
418                 ChunkPrefix: "abc",
419                 Logger:      ctxlog.TestLogger(c),
420         }
421         s.stub.serveCurrentUserAdmin()
422         s.stub.serveFooBarFileCollections()
423         s.stub.serveKeepServices(stubServices)
424         s.stub.serveKeepstoreMounts()
425         s.stub.serveKeepstoreIndexIgnoringPrefix()
426         trashReqs := s.stub.serveKeepstoreTrash()
427         pullReqs := s.stub.serveKeepstorePull()
428         srv := s.newServer(&opts)
429         bal, err := srv.runOnce(context.Background())
430         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
431         c.Check(trashReqs.Count(), check.Equals, 4)
432         c.Check(pullReqs.Count(), check.Equals, 0)
433         c.Check(bal.stats.trashes, check.Equals, 0)
434         c.Check(bal.stats.pulls, check.Equals, 0)
435 }
436
437 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
438         opts := RunOptions{
439                 Logger: ctxlog.TestLogger(c),
440         }
441         s.stub.serveCurrentUserNotAdmin()
442         s.stub.serveZeroCollections()
443         s.stub.serveKeepServices(stubServices)
444         s.stub.serveKeepstoreMounts()
445         trashReqs := s.stub.serveKeepstoreTrash()
446         pullReqs := s.stub.serveKeepstorePull()
447         srv := s.newServer(&opts)
448         _, err := srv.runOnce(context.Background())
449         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
450         c.Check(trashReqs.Count(), check.Equals, 0)
451         c.Check(pullReqs.Count(), check.Equals, 0)
452 }
453
454 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
455         for _, trial := range []struct {
456                 prefix string
457                 errRe  string
458         }{
459                 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
460                 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
461                 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
462         } {
463                 s.SetUpTest(c)
464                 c.Logf("trying invalid prefix %q", trial.prefix)
465                 opts := RunOptions{
466                         ChunkPrefix: trial.prefix,
467                         Logger:      ctxlog.TestLogger(c),
468                 }
469                 s.stub.serveCurrentUserAdmin()
470                 s.stub.serveFooBarFileCollections()
471                 s.stub.serveKeepServices(stubServices)
472                 s.stub.serveKeepstoreMounts()
473                 trashReqs := s.stub.serveKeepstoreTrash()
474                 pullReqs := s.stub.serveKeepstorePull()
475                 srv := s.newServer(&opts)
476                 _, err := srv.runOnce(context.Background())
477                 c.Check(err, check.ErrorMatches, trial.errRe)
478                 c.Check(trashReqs.Count(), check.Equals, 0)
479                 c.Check(pullReqs.Count(), check.Equals, 0)
480         }
481 }
482
483 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
484         opts := RunOptions{
485                 Logger: ctxlog.TestLogger(c),
486         }
487         s.stub.serveCurrentUserAdmin()
488         s.stub.serveZeroCollections()
489         s.stub.serveKeepServices(stubServices)
490         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
491                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
492                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
493                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
494                         DeviceID:       "keep0-vol0",
495                         StorageClasses: map[string]bool{"default": true},
496                 }})
497         })
498         trashReqs := s.stub.serveKeepstoreTrash()
499         pullReqs := s.stub.serveKeepstorePull()
500         srv := s.newServer(&opts)
501         _, err := srv.runOnce(context.Background())
502         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
503         c.Check(trashReqs.Count(), check.Equals, 0)
504         c.Check(pullReqs.Count(), check.Equals, 0)
505 }
506
507 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
508         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
509         c.Assert(err, check.IsNil)
510         s.config.Collections.BlobMissingReport = lostf.Name()
511         defer os.Remove(lostf.Name())
512         opts := RunOptions{
513                 Logger: ctxlog.TestLogger(c),
514         }
515         s.stub.serveCurrentUserAdmin()
516         s.stub.serveFooBarFileCollections()
517         s.stub.serveKeepServices(stubServices)
518         s.stub.serveKeepstoreMounts()
519         s.stub.serveKeepstoreIndexFoo1()
520         s.stub.serveKeepstoreTrash()
521         s.stub.serveKeepstorePull()
522         srv := s.newServer(&opts)
523         c.Assert(err, check.IsNil)
524         _, err = srv.runOnce(context.Background())
525         c.Check(err, check.IsNil)
526         lost, err := ioutil.ReadFile(lostf.Name())
527         c.Assert(err, check.IsNil)
528         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
529 }
530
531 func (s *runSuite) TestDryRun(c *check.C) {
532         s.config.Collections.BalanceTrashLimit = 0
533         s.config.Collections.BalancePullLimit = 0
534         opts := RunOptions{
535                 Logger: ctxlog.TestLogger(c),
536         }
537         s.stub.serveCurrentUserAdmin()
538         collReqs := s.stub.serveFooBarFileCollections()
539         s.stub.serveKeepServices(stubServices)
540         s.stub.serveKeepstoreMounts()
541         s.stub.serveKeepstoreIndexFoo4Bar1()
542         trashReqs := s.stub.serveKeepstoreTrash()
543         pullReqs := s.stub.serveKeepstorePull()
544         srv := s.newServer(&opts)
545         bal, err := srv.runOnce(context.Background())
546         c.Check(err, check.IsNil)
547         for _, req := range collReqs.reqs {
548                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
549                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
550         }
551         c.Check(trashReqs.Count(), check.Equals, 0)
552         c.Check(pullReqs.Count(), check.Equals, 0)
553         c.Check(bal.stats.pulls, check.Equals, 0)
554         c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
555         c.Check(bal.stats.trashes, check.Equals, 0)
556         c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
557         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
559 }
560
561 func (s *runSuite) TestCommit(c *check.C) {
562         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
563         s.config.ManagementToken = "xyzzy"
564         opts := RunOptions{
565                 Logger: ctxlog.TestLogger(c),
566                 Dumper: ctxlog.TestLogger(c),
567         }
568         s.stub.serveCurrentUserAdmin()
569         s.stub.serveFooBarFileCollections()
570         s.stub.serveKeepServices(stubServices)
571         s.stub.serveKeepstoreMounts()
572         s.stub.serveKeepstoreIndexFoo4Bar1()
573         trashReqs := s.stub.serveKeepstoreTrash()
574         pullReqs := s.stub.serveKeepstorePull()
575         srv := s.newServer(&opts)
576         bal, err := srv.runOnce(context.Background())
577         c.Check(err, check.IsNil)
578         c.Check(trashReqs.Count(), check.Equals, 8)
579         c.Check(pullReqs.Count(), check.Equals, 4)
580         // "foo" block is overreplicated by 2
581         c.Check(bal.stats.trashes, check.Equals, 2)
582         // "bar" block is underreplicated by 1, and its only copy is
583         // in a poor rendezvous position
584         c.Check(bal.stats.pulls, check.Equals, 2)
585
586         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
587         c.Assert(err, check.IsNil)
588         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
589
590         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
591         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
592         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
593         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
594         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
595         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
596 }
597
598 func (s *runSuite) TestChunkPrefix(c *check.C) {
599         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
600         opts := RunOptions{
601                 ChunkPrefix: "ac", // catch "foo" but not "bar"
602                 Logger:      ctxlog.TestLogger(c),
603                 Dumper:      ctxlog.TestLogger(c),
604         }
605         s.stub.serveCurrentUserAdmin()
606         s.stub.serveFooBarFileCollections()
607         s.stub.serveKeepServices(stubServices)
608         s.stub.serveKeepstoreMounts()
609         s.stub.serveKeepstoreIndexFoo4Bar1()
610         trashReqs := s.stub.serveKeepstoreTrash()
611         pullReqs := s.stub.serveKeepstorePull()
612         srv := s.newServer(&opts)
613         bal, err := srv.runOnce(context.Background())
614         c.Check(err, check.IsNil)
615         c.Check(trashReqs.Count(), check.Equals, 8)
616         c.Check(pullReqs.Count(), check.Equals, 4)
617         // "foo" block is overreplicated by 2
618         c.Check(bal.stats.trashes, check.Equals, 2)
619         // "bar" block is underreplicated but does not match prefix
620         c.Check(bal.stats.pulls, check.Equals, 0)
621
622         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
623         c.Assert(err, check.IsNil)
624         c.Check(string(lost), check.Equals, "")
625 }
626
627 func (s *runSuite) TestRunForever(c *check.C) {
628         s.config.ManagementToken = "xyzzy"
629         opts := RunOptions{
630                 Logger: ctxlog.TestLogger(c),
631                 Dumper: ctxlog.TestLogger(c),
632         }
633         s.stub.serveCurrentUserAdmin()
634         s.stub.serveFooBarFileCollections()
635         s.stub.serveKeepServices(stubServices)
636         s.stub.serveKeepstoreMounts()
637         s.stub.serveKeepstoreIndexFoo4Bar1()
638         trashReqs := s.stub.serveKeepstoreTrash()
639         pullReqs := s.stub.serveKeepstorePull()
640
641         ctx, cancel := context.WithCancel(context.Background())
642         defer cancel()
643         s.config.Collections.BalancePeriod = arvados.Duration(100 * time.Millisecond)
644         srv := s.newServer(&opts)
645
646         done := make(chan bool)
647         go func() {
648                 srv.runForever(ctx)
649                 close(done)
650         }()
651
652         procself, err := os.FindProcess(os.Getpid())
653         c.Assert(err, check.IsNil)
654
655         // Each run should send 4 pull lists + 4 trash lists. The
656         // first run should also send 4 empty trash lists at
657         // startup. We should complete all four runs in much less than
658         // a second.
659         completedRuns := 0
660         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
661                 pulls := pullReqs.Count()
662                 if pulls >= 16 && trashReqs.Count() == pulls+4 {
663                         break
664                 }
665                 if pulls > 4 {
666                         // Once the 2nd run has started automatically
667                         // (indicating that our BalancePeriod is
668                         // working) we switch to a long wait time to
669                         // effectively stop the timed runs, and
670                         // instead start sending a single SIGUSR1 at
671                         // the end of each (2nd or 3rd) run, to ensure
672                         // we get exactly 4 runs in total.
673                         srv.Cluster.Collections.BalancePeriod = arvados.Duration(time.Minute)
674                         if pulls%4 == 0 && pulls <= 12 && pulls/4 > completedRuns {
675                                 completedRuns = pulls / 4
676                                 c.Logf("completed run %d, sending SIGUSR1 to trigger next run", completedRuns)
677                                 procself.Signal(syscall.SIGUSR1)
678                         }
679                 }
680                 time.Sleep(time.Millisecond)
681         }
682         cancel()
683         <-done
684         c.Check(pullReqs.Count() >= 16, check.Equals, true)
685         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
686
687         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
688         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
689 }