21720:
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/jmoiron/sqlx"
26         "github.com/prometheus/client_golang/prometheus"
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&runSuite{})
31
32 type reqTracker struct {
33         reqs []http.Request
34         sync.Mutex
35 }
36
37 func (rt *reqTracker) Count() int {
38         rt.Lock()
39         defer rt.Unlock()
40         return len(rt.reqs)
41 }
42
43 func (rt *reqTracker) Add(req *http.Request) int {
44         rt.Lock()
45         defer rt.Unlock()
46         rt.reqs = append(rt.reqs, *req)
47         return len(rt.reqs)
48 }
49
50 var stubServices = []arvados.KeepService{
51         {
52                 UUID:           "zzzzz-bi6l4-000000000000000",
53                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
54                 ServicePort:    25107,
55                 ServiceSSLFlag: false,
56                 ServiceType:    "disk",
57         },
58         {
59                 UUID:           "zzzzz-bi6l4-000000000000001",
60                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
61                 ServicePort:    25107,
62                 ServiceSSLFlag: false,
63                 ServiceType:    "disk",
64         },
65         {
66                 UUID:           "zzzzz-bi6l4-000000000000002",
67                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
68                 ServicePort:    25107,
69                 ServiceSSLFlag: false,
70                 ServiceType:    "disk",
71         },
72         {
73                 UUID:           "zzzzz-bi6l4-000000000000003",
74                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
75                 ServicePort:    25107,
76                 ServiceSSLFlag: false,
77                 ServiceType:    "disk",
78         },
79         {
80                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
81                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
82                 ServicePort:    25333,
83                 ServiceSSLFlag: true,
84                 ServiceType:    "proxy",
85         },
86 }
87
88 var stubMounts = map[string][]arvados.KeepMount{
89         "keep0.zzzzz.arvadosapi.com:25107": {{
90                 UUID:           "zzzzz-ivpuk-000000000000000",
91                 DeviceID:       "keep0-vol0",
92                 StorageClasses: map[string]bool{"default": true},
93                 AllowWrite:     true,
94                 AllowTrash:     true,
95         }},
96         "keep1.zzzzz.arvadosapi.com:25107": {{
97                 UUID:           "zzzzz-ivpuk-100000000000000",
98                 DeviceID:       "keep1-vol0",
99                 StorageClasses: map[string]bool{"default": true},
100                 AllowWrite:     true,
101                 AllowTrash:     true,
102         }},
103         "keep2.zzzzz.arvadosapi.com:25107": {{
104                 UUID:           "zzzzz-ivpuk-200000000000000",
105                 DeviceID:       "keep2-vol0",
106                 StorageClasses: map[string]bool{"default": true},
107                 AllowWrite:     true,
108                 AllowTrash:     true,
109         }},
110         "keep3.zzzzz.arvadosapi.com:25107": {{
111                 UUID:           "zzzzz-ivpuk-300000000000000",
112                 DeviceID:       "keep3-vol0",
113                 StorageClasses: map[string]bool{"default": true},
114                 AllowWrite:     true,
115                 AllowTrash:     true,
116         }},
117 }
118
119 // stubServer is an HTTP transport that intercepts and processes all
120 // requests using its own handlers.
121 type stubServer struct {
122         mux      *http.ServeMux
123         srv      *httptest.Server
124         mutex    sync.Mutex
125         Requests reqTracker
126         logf     func(string, ...interface{})
127 }
128
129 // Start initializes the stub server and returns an *http.Client that
130 // uses the stub server to handle all requests.
131 //
132 // A stubServer that has been started should eventually be shut down
133 // with Close().
134 func (s *stubServer) Start() *http.Client {
135         // Set up a config.Client that forwards all requests to s.mux
136         // via s.srv. Test cases will attach handlers to s.mux to get
137         // the desired responses.
138         s.mux = http.NewServeMux()
139         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
140                 s.mutex.Lock()
141                 s.Requests.Add(r)
142                 s.mutex.Unlock()
143                 w.Header().Set("Content-Type", "application/json")
144                 s.mux.ServeHTTP(w, r)
145         }))
146         return &http.Client{Transport: s}
147 }
148
149 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
150         w := httptest.NewRecorder()
151         s.mux.ServeHTTP(w, req)
152         return &http.Response{
153                 StatusCode: w.Code,
154                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
155                 Header:     w.HeaderMap,
156                 Body:       ioutil.NopCloser(w.Body)}, nil
157 }
158
159 // Close releases resources used by the server.
160 func (s *stubServer) Close() {
161         s.srv.Close()
162 }
163
164 func (s *stubServer) serveStatic(path, data string) *reqTracker {
165         rt := &reqTracker{}
166         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
167                 rt.Add(r)
168                 if r.Body != nil {
169                         ioutil.ReadAll(r.Body)
170                         r.Body.Close()
171                 }
172                 io.WriteString(w, data)
173         })
174         return rt
175 }
176
177 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
178         return s.serveStatic("/arvados/v1/users/current",
179                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
180 }
181
182 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
183         return s.serveStatic("/arvados/v1/users/current",
184                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
185 }
186
187 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
188         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
189                 `{"defaultCollectionReplication":2}`)
190 }
191
192 func (s *stubServer) serveZeroCollections() *reqTracker {
193         return s.serveStatic("/arvados/v1/collections",
194                 `{"items":[],"items_available":0}`)
195 }
196
197 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
198         rt := &reqTracker{}
199         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
200                 r.ParseForm()
201                 rt.Add(r)
202                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
203                         io.WriteString(w, `{"items_available":0,"items":[]}`)
204                 } else {
205                         io.WriteString(w, `{"items_available":3,"items":[
206                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
207                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
208                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
209                 }
210         })
211         return rt
212 }
213
214 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
215         rt := &reqTracker{}
216         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
217                 r.ParseForm()
218                 rt.Add(r)
219                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
220                         io.WriteString(w, `{"items_available":3,"items":[]}`)
221                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
222                         io.WriteString(w, `{"items_available":0,"items":[]}`)
223                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
224                         io.WriteString(w, `{"items_available":0,"items":[]}`)
225                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","=",null`) {
226                         io.WriteString(w, `{"items_available":0,"items":[]}`)
227                 } else {
228                         io.WriteString(w, `{"items_available":2,"items":[
229                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
230                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
231                 }
232         })
233         return rt
234 }
235
236 func (s *stubServer) serveZeroKeepServices() *reqTracker {
237         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
238 }
239
240 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
241         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
242                 ItemsAvailable: len(svcs),
243                 Items:          svcs,
244         })
245 }
246
247 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
248         rt := &reqTracker{}
249         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
250                 rt.Add(r)
251                 json.NewEncoder(w).Encode(resp)
252         })
253         return rt
254 }
255
256 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
257         rt := &reqTracker{}
258         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
259                 rt.Add(r)
260                 json.NewEncoder(w).Encode(stubMounts[r.Host])
261         })
262         return rt
263 }
264
265 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
266         fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
267         barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
268         rt := &reqTracker{}
269         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
270                 count := rt.Add(r)
271                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
272                         io.WriteString(w, barLine)
273                 }
274                 if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
275                         io.WriteString(w, fooLine(count))
276                 }
277                 io.WriteString(w, "\n")
278         })
279         for _, mounts := range stubMounts {
280                 for i, mnt := range mounts {
281                         i := i
282                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
283                                 count := rt.Add(r)
284                                 r.ParseForm()
285                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
286                                         io.WriteString(w, barLine)
287                                 }
288                                 if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
289                                         io.WriteString(w, fooLine(count))
290                                 }
291                                 io.WriteString(w, "\n")
292                         })
293                 }
294         }
295         return rt
296 }
297
298 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
299         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
300         rt := &reqTracker{}
301         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
302                 rt.Add(r)
303                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
304                         io.WriteString(w, fooLine)
305                 }
306                 io.WriteString(w, "\n")
307         })
308         for _, mounts := range stubMounts {
309                 for i, mnt := range mounts {
310                         i := i
311                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
312                                 rt.Add(r)
313                                 if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
314                                         io.WriteString(w, fooLine)
315                                 }
316                                 io.WriteString(w, "\n")
317                         })
318                 }
319         }
320         return rt
321 }
322
323 func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
324         fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
325         rt := &reqTracker{}
326         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
327                 rt.Add(r)
328                 io.WriteString(w, fooLine)
329                 io.WriteString(w, "\n")
330         })
331         for _, mounts := range stubMounts {
332                 for _, mnt := range mounts {
333                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
334                                 rt.Add(r)
335                                 io.WriteString(w, fooLine)
336                                 io.WriteString(w, "\n")
337                         })
338                 }
339         }
340         return rt
341 }
342
343 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
344         return s.serveStatic("/trash", `{}`)
345 }
346
347 func (s *stubServer) serveKeepstorePull() *reqTracker {
348         return s.serveStatic("/pull", `{}`)
349 }
350
351 type runSuite struct {
352         stub   stubServer
353         config *arvados.Cluster
354         db     *sqlx.DB
355         client *arvados.Client
356 }
357
358 func (s *runSuite) newServer(options *RunOptions) *Server {
359         srv := &Server{
360                 Cluster:    s.config,
361                 ArvClient:  s.client,
362                 RunOptions: *options,
363                 Metrics:    newMetrics(prometheus.NewRegistry()),
364                 Logger:     options.Logger,
365                 Dumper:     options.Dumper,
366                 DB:         s.db,
367         }
368         return srv
369 }
370
371 func (s *runSuite) SetUpTest(c *check.C) {
372         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
373         c.Assert(err, check.Equals, nil)
374         s.config, err = cfg.GetCluster("")
375         c.Assert(err, check.Equals, nil)
376         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
377         c.Assert(err, check.IsNil)
378
379         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
380         arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
381
382         s.client = &arvados.Client{
383                 AuthToken: "xyzzy",
384                 APIHost:   "zzzzz.arvadosapi.com",
385                 Client:    s.stub.Start()}
386
387         s.stub.serveDiscoveryDoc()
388         s.stub.logf = c.Logf
389 }
390
391 func (s *runSuite) TearDownTest(c *check.C) {
392         s.stub.Close()
393 }
394
395 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
396         defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
397         _, err := s.db.Exec(`delete from collections`)
398         c.Assert(err, check.IsNil)
399         opts := RunOptions{
400                 Logger: ctxlog.TestLogger(c),
401         }
402         s.stub.serveCurrentUserAdmin()
403         s.stub.serveZeroCollections()
404         s.stub.serveKeepServices(stubServices)
405         s.stub.serveKeepstoreMounts()
406         s.stub.serveKeepstoreIndexFoo4Bar1()
407         trashReqs := s.stub.serveKeepstoreTrash()
408         pullReqs := s.stub.serveKeepstorePull()
409         srv := s.newServer(&opts)
410         _, err = srv.runOnce(context.Background())
411         c.Check(err, check.ErrorMatches, "received zero collections")
412         c.Check(trashReqs.Count(), check.Equals, 4)
413         c.Check(pullReqs.Count(), check.Equals, 0)
414 }
415
416 func (s *runSuite) TestRefuseBadIndex(c *check.C) {
417         opts := RunOptions{
418                 ChunkPrefix: "abc",
419                 Logger:      ctxlog.TestLogger(c),
420         }
421         s.stub.serveCurrentUserAdmin()
422         s.stub.serveFooBarFileCollections()
423         s.stub.serveKeepServices(stubServices)
424         s.stub.serveKeepstoreMounts()
425         s.stub.serveKeepstoreIndexIgnoringPrefix()
426         trashReqs := s.stub.serveKeepstoreTrash()
427         pullReqs := s.stub.serveKeepstorePull()
428         srv := s.newServer(&opts)
429         bal, err := srv.runOnce(context.Background())
430         c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
431         c.Check(trashReqs.Count(), check.Equals, 4)
432         c.Check(pullReqs.Count(), check.Equals, 0)
433         c.Check(bal.stats.trashes, check.Equals, 0)
434         c.Check(bal.stats.pulls, check.Equals, 0)
435 }
436
437 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
438         opts := RunOptions{
439                 Logger: ctxlog.TestLogger(c),
440         }
441         s.stub.serveCurrentUserNotAdmin()
442         s.stub.serveZeroCollections()
443         s.stub.serveKeepServices(stubServices)
444         s.stub.serveKeepstoreMounts()
445         trashReqs := s.stub.serveKeepstoreTrash()
446         pullReqs := s.stub.serveKeepstorePull()
447         srv := s.newServer(&opts)
448         _, err := srv.runOnce(context.Background())
449         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
450         c.Check(trashReqs.Count(), check.Equals, 0)
451         c.Check(pullReqs.Count(), check.Equals, 0)
452 }
453
454 func (s *runSuite) TestInvalidChunkPrefix(c *check.C) {
455         for _, trial := range []struct {
456                 prefix string
457                 errRe  string
458         }{
459                 {"123ABC", "invalid char \"A\" in chunk prefix.*"},
460                 {"123xyz", "invalid char \"x\" in chunk prefix.*"},
461                 {"123456789012345678901234567890123", "invalid chunk prefix .* longer than a block hash"},
462         } {
463                 s.SetUpTest(c)
464                 c.Logf("trying invalid prefix %q", trial.prefix)
465                 opts := RunOptions{
466                         ChunkPrefix: trial.prefix,
467                         Logger:      ctxlog.TestLogger(c),
468                 }
469                 s.stub.serveCurrentUserAdmin()
470                 s.stub.serveFooBarFileCollections()
471                 s.stub.serveKeepServices(stubServices)
472                 s.stub.serveKeepstoreMounts()
473                 trashReqs := s.stub.serveKeepstoreTrash()
474                 pullReqs := s.stub.serveKeepstorePull()
475                 srv := s.newServer(&opts)
476                 _, err := srv.runOnce(context.Background())
477                 c.Check(err, check.ErrorMatches, trial.errRe)
478                 c.Check(trashReqs.Count(), check.Equals, 0)
479                 c.Check(pullReqs.Count(), check.Equals, 0)
480         }
481 }
482
483 func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
484         opts := RunOptions{
485                 Logger: ctxlog.TestLogger(c),
486         }
487         s.stub.serveCurrentUserAdmin()
488         s.stub.serveZeroCollections()
489         s.stub.serveKeepServices(stubServices)
490         s.stub.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
491                 hostid := r.Host[:5] // "keep0.zzzzz.arvadosapi.com:25107" => "keep0"
492                 json.NewEncoder(w).Encode([]arvados.KeepMount{{
493                         UUID:           "zzzzz-ivpuk-0000000000" + hostid,
494                         DeviceID:       "keep0-vol0",
495                         StorageClasses: map[string]bool{"default": true},
496                 }})
497         })
498         trashReqs := s.stub.serveKeepstoreTrash()
499         pullReqs := s.stub.serveKeepstorePull()
500         srv := s.newServer(&opts)
501         _, err := srv.runOnce(context.Background())
502         c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
503         c.Check(trashReqs.Count(), check.Equals, 0)
504         c.Check(pullReqs.Count(), check.Equals, 0)
505 }
506
507 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
508         lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
509         c.Assert(err, check.IsNil)
510         s.config.Collections.BlobMissingReport = lostf.Name()
511         defer os.Remove(lostf.Name())
512         opts := RunOptions{
513                 Logger: ctxlog.TestLogger(c),
514         }
515         s.stub.serveCurrentUserAdmin()
516         s.stub.serveFooBarFileCollections()
517         s.stub.serveKeepServices(stubServices)
518         s.stub.serveKeepstoreMounts()
519         s.stub.serveKeepstoreIndexFoo1()
520         s.stub.serveKeepstoreTrash()
521         s.stub.serveKeepstorePull()
522         srv := s.newServer(&opts)
523         c.Assert(err, check.IsNil)
524         _, err = srv.runOnce(context.Background())
525         c.Check(err, check.IsNil)
526         lost, err := ioutil.ReadFile(lostf.Name())
527         c.Assert(err, check.IsNil)
528         c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
529 }
530
531 func (s *runSuite) TestDryRun(c *check.C) {
532         s.config.Collections.BalanceTrashLimit = 0
533         s.config.Collections.BalancePullLimit = 0
534         opts := RunOptions{
535                 Logger: ctxlog.TestLogger(c),
536         }
537         s.stub.serveCurrentUserAdmin()
538         collReqs := s.stub.serveFooBarFileCollections()
539         s.stub.serveKeepServices(stubServices)
540         s.stub.serveKeepstoreMounts()
541         s.stub.serveKeepstoreIndexFoo4Bar1()
542         trashReqs := s.stub.serveKeepstoreTrash()
543         pullReqs := s.stub.serveKeepstorePull()
544         srv := s.newServer(&opts)
545         bal, err := srv.runOnce(context.Background())
546         c.Check(err, check.IsNil)
547         for _, req := range collReqs.reqs {
548                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
549                 c.Check(req.Form.Get("include_old_versions"), check.Equals, "true")
550         }
551         c.Check(trashReqs.Count(), check.Equals, 0)
552         c.Check(pullReqs.Count(), check.Equals, 0)
553         c.Check(bal.stats.pulls, check.Equals, 0)
554         c.Check(bal.stats.pullsDeferred, check.Not(check.Equals), 0)
555         c.Check(bal.stats.trashes, check.Equals, 0)
556         c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0)
557         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
558         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
559
560         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
561         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_trash_entries_deferred_count [1-9].*`)
562         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_pull_entries_deferred_count [1-9].*`)
563 }
564
565 func (s *runSuite) TestCommit(c *check.C) {
566         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
567         s.config.ManagementToken = "xyzzy"
568         opts := RunOptions{
569                 Logger: ctxlog.TestLogger(c),
570                 Dumper: ctxlog.TestLogger(c),
571         }
572         s.stub.serveCurrentUserAdmin()
573         s.stub.serveFooBarFileCollections()
574         s.stub.serveKeepServices(stubServices)
575         s.stub.serveKeepstoreMounts()
576         s.stub.serveKeepstoreIndexFoo4Bar1()
577         trashReqs := s.stub.serveKeepstoreTrash()
578         pullReqs := s.stub.serveKeepstorePull()
579         srv := s.newServer(&opts)
580         bal, err := srv.runOnce(context.Background())
581         c.Check(err, check.IsNil)
582         c.Check(trashReqs.Count(), check.Equals, 8)
583         c.Check(pullReqs.Count(), check.Equals, 4)
584         // "foo" block is overreplicated by 2
585         c.Check(bal.stats.trashes, check.Equals, 2)
586         // "bar" block is underreplicated by 1, and its only copy is
587         // in a poor rendezvous position
588         c.Check(bal.stats.pulls, check.Equals, 2)
589
590         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
591         c.Assert(err, check.IsNil)
592         c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
593
594         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
595         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
596         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
597         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
598         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
599         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
600
601         for _, cat := range []string{
602                 "dedup_byte_ratio", "dedup_block_ratio", "collection_bytes",
603                 "referenced_bytes", "referenced_blocks", "reference_count",
604                 "pull_entries_sent_count",
605                 "trash_entries_sent_count",
606         } {
607                 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+` [1-9].*`)
608         }
609
610         for _, cat := range []string{
611                 "pull_entries_deferred_count",
612                 "trash_entries_deferred_count",
613         } {
614                 c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+` 0\n.*`)
615         }
616
617         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="0"} [1-9].*`)
618         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="1"} [1-9].*`)
619         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="9"} 0\n.*`)
620
621         for _, sub := range []string{"replicas", "blocks", "bytes"} {
622                 for _, cat := range []string{"needed", "unneeded", "unachievable", "pulling"} {
623                         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_usage_`+sub+`{status="`+cat+`",storage_class="default"} [1-9].*`)
624                 }
625                 for _, cat := range []string{"total", "garbage", "transient", "overreplicated", "underreplicated", "unachievable", "balanced", "desired", "lost"} {
626                         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+`_`+sub+` [0-9].*`)
627                 }
628         }
629         c.Logf("%s", metrics)
630 }
631
632 func (s *runSuite) TestChunkPrefix(c *check.C) {
633         s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
634         opts := RunOptions{
635                 ChunkPrefix: "ac", // catch "foo" but not "bar"
636                 Logger:      ctxlog.TestLogger(c),
637                 Dumper:      ctxlog.TestLogger(c),
638         }
639         s.stub.serveCurrentUserAdmin()
640         s.stub.serveFooBarFileCollections()
641         s.stub.serveKeepServices(stubServices)
642         s.stub.serveKeepstoreMounts()
643         s.stub.serveKeepstoreIndexFoo4Bar1()
644         trashReqs := s.stub.serveKeepstoreTrash()
645         pullReqs := s.stub.serveKeepstorePull()
646         srv := s.newServer(&opts)
647         bal, err := srv.runOnce(context.Background())
648         c.Check(err, check.IsNil)
649         c.Check(trashReqs.Count(), check.Equals, 8)
650         c.Check(pullReqs.Count(), check.Equals, 4)
651         // "foo" block is overreplicated by 2
652         c.Check(bal.stats.trashes, check.Equals, 2)
653         // "bar" block is underreplicated but does not match prefix
654         c.Check(bal.stats.pulls, check.Equals, 0)
655
656         lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
657         c.Assert(err, check.IsNil)
658         c.Check(string(lost), check.Equals, "")
659 }
660
661 func (s *runSuite) TestRunForever_TriggeredByTimer(c *check.C) {
662         s.config.ManagementToken = "xyzzy"
663         opts := RunOptions{
664                 Logger: ctxlog.TestLogger(c),
665                 Dumper: ctxlog.TestLogger(c),
666         }
667         s.stub.serveCurrentUserAdmin()
668         s.stub.serveFooBarFileCollections()
669         s.stub.serveKeepServices(stubServices)
670         s.stub.serveKeepstoreMounts()
671         s.stub.serveKeepstoreIndexFoo4Bar1()
672         trashReqs := s.stub.serveKeepstoreTrash()
673         pullReqs := s.stub.serveKeepstorePull()
674
675         ctx, cancel := context.WithCancel(context.Background())
676         defer cancel()
677         s.config.Collections.BalancePeriod = arvados.Duration(10 * time.Millisecond)
678         srv := s.newServer(&opts)
679
680         done := make(chan bool)
681         go func() {
682                 srv.runForever(ctx)
683                 close(done)
684         }()
685
686         // Each run should send 4 pull lists + 4 trash lists. The
687         // first run should also send 4 empty trash lists at
688         // startup. We should complete at least four runs in much less
689         // than 10s.
690         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
691                 pulls := pullReqs.Count()
692                 if pulls >= 16 && trashReqs.Count() == pulls+4 {
693                         break
694                 }
695                 time.Sleep(time.Millisecond)
696         }
697         cancel()
698         <-done
699         c.Check(pullReqs.Count() >= 16, check.Equals, true)
700         c.Check(trashReqs.Count() >= 20, check.Equals, true)
701
702         // We should have completed 4 runs before calling cancel().
703         // But the next run might also have started before we called
704         // cancel(), in which case the extra run will be included in
705         // the changeset_compute_seconds_count metric.
706         completed := pullReqs.Count() / 4
707         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
708         c.Check(metrics, check.Matches, fmt.Sprintf(`(?ms).*\narvados_keepbalance_changeset_compute_seconds_count (%d|%d)\n.*`, completed, completed+1))
709 }
710
711 func (s *runSuite) TestRunForever_TriggeredBySignal(c *check.C) {
712         s.config.ManagementToken = "xyzzy"
713         opts := RunOptions{
714                 Logger: ctxlog.TestLogger(c),
715                 Dumper: ctxlog.TestLogger(c),
716         }
717         s.stub.serveCurrentUserAdmin()
718         s.stub.serveFooBarFileCollections()
719         s.stub.serveKeepServices(stubServices)
720         s.stub.serveKeepstoreMounts()
721         s.stub.serveKeepstoreIndexFoo4Bar1()
722         trashReqs := s.stub.serveKeepstoreTrash()
723         pullReqs := s.stub.serveKeepstorePull()
724
725         ctx, cancel := context.WithCancel(context.Background())
726         defer cancel()
727         s.config.Collections.BalancePeriod = arvados.Duration(time.Minute)
728         srv := s.newServer(&opts)
729
730         done := make(chan bool)
731         go func() {
732                 srv.runForever(ctx)
733                 close(done)
734         }()
735
736         procself, err := os.FindProcess(os.Getpid())
737         c.Assert(err, check.IsNil)
738
739         // Each run should send 4 pull lists + 4 trash lists. The
740         // first run should also send 4 empty trash lists at
741         // startup. We should be able to complete four runs in much
742         // less than 10s.
743         completedRuns := 0
744         for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
745                 pulls := pullReqs.Count()
746                 if pulls >= 16 && trashReqs.Count() == pulls+4 {
747                         break
748                 }
749                 // Once the 1st run has started automatically, we
750                 // start sending a single SIGUSR1 at the end of each
751                 // run, to ensure we get exactly 4 runs in total.
752                 if pulls > 0 && pulls%4 == 0 && pulls <= 12 && pulls/4 > completedRuns {
753                         completedRuns = pulls / 4
754                         c.Logf("completed run %d, sending SIGUSR1 to trigger next run", completedRuns)
755                         procself.Signal(syscall.SIGUSR1)
756                 }
757                 time.Sleep(time.Millisecond)
758         }
759         cancel()
760         <-done
761         c.Check(pullReqs.Count(), check.Equals, 16)
762         c.Check(trashReqs.Count(), check.Equals, 20)
763
764         metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
765         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 4\n.*`)
766 }