Merge branch 'master' into 14260-runtime-token
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "net/http/httptest"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/Sirupsen/logrus"
20
21         check "gopkg.in/check.v1"
22 )
23
24 var _ = check.Suite(&runSuite{})
25
26 type reqTracker struct {
27         reqs []http.Request
28         sync.Mutex
29 }
30
31 func (rt *reqTracker) Count() int {
32         rt.Lock()
33         defer rt.Unlock()
34         return len(rt.reqs)
35 }
36
37 func (rt *reqTracker) Add(req *http.Request) int {
38         rt.Lock()
39         defer rt.Unlock()
40         rt.reqs = append(rt.reqs, *req)
41         return len(rt.reqs)
42 }
43
44 var stubServices = []arvados.KeepService{
45         {
46                 UUID:           "zzzzz-bi6l4-000000000000000",
47                 ServiceHost:    "keep0.zzzzz.arvadosapi.com",
48                 ServicePort:    25107,
49                 ServiceSSLFlag: false,
50                 ServiceType:    "disk",
51         },
52         {
53                 UUID:           "zzzzz-bi6l4-000000000000001",
54                 ServiceHost:    "keep1.zzzzz.arvadosapi.com",
55                 ServicePort:    25107,
56                 ServiceSSLFlag: false,
57                 ServiceType:    "disk",
58         },
59         {
60                 UUID:           "zzzzz-bi6l4-000000000000002",
61                 ServiceHost:    "keep2.zzzzz.arvadosapi.com",
62                 ServicePort:    25107,
63                 ServiceSSLFlag: false,
64                 ServiceType:    "disk",
65         },
66         {
67                 UUID:           "zzzzz-bi6l4-000000000000003",
68                 ServiceHost:    "keep3.zzzzz.arvadosapi.com",
69                 ServicePort:    25107,
70                 ServiceSSLFlag: false,
71                 ServiceType:    "disk",
72         },
73         {
74                 UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
75                 ServiceHost:    "keep.zzzzz.arvadosapi.com",
76                 ServicePort:    25333,
77                 ServiceSSLFlag: true,
78                 ServiceType:    "proxy",
79         },
80 }
81
82 var stubMounts = map[string][]arvados.KeepMount{
83         "keep0.zzzzz.arvadosapi.com:25107": {{
84                 UUID:     "zzzzz-ivpuk-000000000000000",
85                 DeviceID: "keep0-vol0",
86         }},
87         "keep1.zzzzz.arvadosapi.com:25107": {{
88                 UUID:     "zzzzz-ivpuk-100000000000000",
89                 DeviceID: "keep1-vol0",
90         }},
91         "keep2.zzzzz.arvadosapi.com:25107": {{
92                 UUID:     "zzzzz-ivpuk-200000000000000",
93                 DeviceID: "keep2-vol0",
94         }},
95         "keep3.zzzzz.arvadosapi.com:25107": {{
96                 UUID:     "zzzzz-ivpuk-300000000000000",
97                 DeviceID: "keep3-vol0",
98         }},
99 }
100
101 // stubServer is an HTTP transport that intercepts and processes all
102 // requests using its own handlers.
103 type stubServer struct {
104         mux      *http.ServeMux
105         srv      *httptest.Server
106         mutex    sync.Mutex
107         Requests reqTracker
108         logf     func(string, ...interface{})
109 }
110
111 // Start initializes the stub server and returns an *http.Client that
112 // uses the stub server to handle all requests.
113 //
114 // A stubServer that has been started should eventually be shut down
115 // with Close().
116 func (s *stubServer) Start() *http.Client {
117         // Set up a config.Client that forwards all requests to s.mux
118         // via s.srv. Test cases will attach handlers to s.mux to get
119         // the desired responses.
120         s.mux = http.NewServeMux()
121         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
122                 s.mutex.Lock()
123                 s.Requests.Add(r)
124                 s.mutex.Unlock()
125                 w.Header().Set("Content-Type", "application/json")
126                 s.mux.ServeHTTP(w, r)
127         }))
128         return &http.Client{Transport: s}
129 }
130
131 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
132         w := httptest.NewRecorder()
133         s.mux.ServeHTTP(w, req)
134         return &http.Response{
135                 StatusCode: w.Code,
136                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
137                 Header:     w.HeaderMap,
138                 Body:       ioutil.NopCloser(w.Body)}, nil
139 }
140
141 // Close releases resources used by the server.
142 func (s *stubServer) Close() {
143         s.srv.Close()
144 }
145
146 func (s *stubServer) serveStatic(path, data string) *reqTracker {
147         rt := &reqTracker{}
148         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
149                 rt.Add(r)
150                 if r.Body != nil {
151                         ioutil.ReadAll(r.Body)
152                         r.Body.Close()
153                 }
154                 io.WriteString(w, data)
155         })
156         return rt
157 }
158
159 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
160         return s.serveStatic("/arvados/v1/users/current",
161                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
162 }
163
164 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
165         return s.serveStatic("/arvados/v1/users/current",
166                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
167 }
168
169 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
170         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
171                 `{"defaultCollectionReplication":2}`)
172 }
173
174 func (s *stubServer) serveZeroCollections() *reqTracker {
175         return s.serveStatic("/arvados/v1/collections",
176                 `{"items":[],"items_available":0}`)
177 }
178
179 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
180         rt := &reqTracker{}
181         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
182                 r.ParseForm()
183                 rt.Add(r)
184                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
185                         io.WriteString(w, `{"items_available":0,"items":[]}`)
186                 } else {
187                         io.WriteString(w, `{"items_available":3,"items":[
188                                 {"uuid":"zzzzz-4zz18-aaaaaaaaaaaaaaa","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
189                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
190                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
191                 }
192         })
193         return rt
194 }
195
196 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
197         rt := &reqTracker{}
198         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
199                 r.ParseForm()
200                 rt.Add(r)
201                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
202                         io.WriteString(w, `{"items_available":3,"items":[]}`)
203                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
204                         io.WriteString(w, `{"items_available":0,"items":[]}`)
205                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
206                         io.WriteString(w, `{"items_available":0,"items":[]}`)
207                 } else {
208                         io.WriteString(w, `{"items_available":2,"items":[
209                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
210                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
211                 }
212         })
213         return rt
214 }
215
216 func (s *stubServer) serveZeroKeepServices() *reqTracker {
217         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
218 }
219
220 func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
221         return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
222                 ItemsAvailable: len(svcs),
223                 Items:          svcs,
224         })
225 }
226
227 func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
228         rt := &reqTracker{}
229         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
230                 rt.Add(r)
231                 json.NewEncoder(w).Encode(resp)
232         })
233         return rt
234 }
235
236 func (s *stubServer) serveKeepstoreMounts() *reqTracker {
237         rt := &reqTracker{}
238         s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
239                 rt.Add(r)
240                 json.NewEncoder(w).Encode(stubMounts[r.Host])
241         })
242         return rt
243 }
244
245 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
246         rt := &reqTracker{}
247         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
248                 count := rt.Add(r)
249                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
250                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
251                 }
252                 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
253         })
254         for _, mounts := range stubMounts {
255                 for i, mnt := range mounts {
256                         i := i
257                         s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
258                                 count := rt.Add(r)
259                                 if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
260                                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
261                                 }
262                                 if i == 0 {
263                                         fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
264                                 }
265                                 fmt.Fprintf(w, "\n")
266                         })
267                 }
268         }
269         return rt
270 }
271
272 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
273         return s.serveStatic("/trash", `{}`)
274 }
275
276 func (s *stubServer) serveKeepstorePull() *reqTracker {
277         return s.serveStatic("/pull", `{}`)
278 }
279
280 type runSuite struct {
281         stub   stubServer
282         config Config
283 }
284
285 // make a log.Logger that writes to the current test's c.Log().
286 func (s *runSuite) logger(c *check.C) *logrus.Logger {
287         r, w := io.Pipe()
288         go func() {
289                 buf := make([]byte, 10000)
290                 for {
291                         n, err := r.Read(buf)
292                         if n > 0 {
293                                 if buf[n-1] == '\n' {
294                                         n--
295                                 }
296                                 c.Log(string(buf[:n]))
297                         }
298                         if err != nil {
299                                 break
300                         }
301                 }
302         }()
303         logger := logrus.New()
304         logger.Out = w
305         return logger
306 }
307
308 func (s *runSuite) SetUpTest(c *check.C) {
309         s.config = Config{
310                 Client: arvados.Client{
311                         AuthToken: "xyzzy",
312                         APIHost:   "zzzzz.arvadosapi.com",
313                         Client:    s.stub.Start()},
314                 KeepServiceTypes: []string{"disk"},
315                 RunPeriod:        arvados.Duration(time.Second),
316         }
317         s.stub.serveDiscoveryDoc()
318         s.stub.logf = c.Logf
319 }
320
321 func (s *runSuite) TearDownTest(c *check.C) {
322         s.stub.Close()
323 }
324
325 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
326         opts := RunOptions{
327                 CommitPulls: true,
328                 CommitTrash: true,
329                 Logger:      s.logger(c),
330         }
331         s.stub.serveCurrentUserAdmin()
332         s.stub.serveZeroCollections()
333         s.stub.serveKeepServices(stubServices)
334         s.stub.serveKeepstoreMounts()
335         s.stub.serveKeepstoreIndexFoo4Bar1()
336         trashReqs := s.stub.serveKeepstoreTrash()
337         pullReqs := s.stub.serveKeepstorePull()
338         srv, err := NewServer(s.config, opts)
339         c.Assert(err, check.IsNil)
340         _, err = srv.Run()
341         c.Check(err, check.ErrorMatches, "received zero collections")
342         c.Check(trashReqs.Count(), check.Equals, 4)
343         c.Check(pullReqs.Count(), check.Equals, 0)
344 }
345
346 func (s *runSuite) TestServiceTypes(c *check.C) {
347         opts := RunOptions{
348                 CommitPulls: true,
349                 CommitTrash: true,
350                 Logger:      s.logger(c),
351         }
352         s.config.KeepServiceTypes = []string{"unlisted-type"}
353         s.stub.serveCurrentUserAdmin()
354         s.stub.serveFooBarFileCollections()
355         s.stub.serveKeepServices(stubServices)
356         s.stub.serveKeepstoreMounts()
357         indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
358         trashReqs := s.stub.serveKeepstoreTrash()
359         srv, err := NewServer(s.config, opts)
360         c.Assert(err, check.IsNil)
361         _, err = srv.Run()
362         c.Check(err, check.IsNil)
363         c.Check(indexReqs.Count(), check.Equals, 0)
364         c.Check(trashReqs.Count(), check.Equals, 0)
365 }
366
367 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
368         opts := RunOptions{
369                 CommitPulls: true,
370                 CommitTrash: true,
371                 Logger:      s.logger(c),
372         }
373         s.stub.serveCurrentUserNotAdmin()
374         s.stub.serveZeroCollections()
375         s.stub.serveKeepServices(stubServices)
376         s.stub.serveKeepstoreMounts()
377         trashReqs := s.stub.serveKeepstoreTrash()
378         pullReqs := s.stub.serveKeepstorePull()
379         srv, err := NewServer(s.config, opts)
380         c.Assert(err, check.IsNil)
381         _, err = srv.Run()
382         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
383         c.Check(trashReqs.Count(), check.Equals, 0)
384         c.Check(pullReqs.Count(), check.Equals, 0)
385 }
386
387 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
388         opts := RunOptions{
389                 CommitPulls: true,
390                 CommitTrash: true,
391                 Logger:      s.logger(c),
392         }
393         s.stub.serveCurrentUserAdmin()
394         s.stub.serveCollectionsButSkipOne()
395         s.stub.serveKeepServices(stubServices)
396         s.stub.serveKeepstoreMounts()
397         s.stub.serveKeepstoreIndexFoo4Bar1()
398         trashReqs := s.stub.serveKeepstoreTrash()
399         pullReqs := s.stub.serveKeepstorePull()
400         srv, err := NewServer(s.config, opts)
401         c.Assert(err, check.IsNil)
402         _, err = srv.Run()
403         c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
404         c.Check(trashReqs.Count(), check.Equals, 4)
405         c.Check(pullReqs.Count(), check.Equals, 0)
406 }
407
408 func (s *runSuite) TestDryRun(c *check.C) {
409         opts := RunOptions{
410                 CommitPulls: false,
411                 CommitTrash: false,
412                 Logger:      s.logger(c),
413         }
414         s.stub.serveCurrentUserAdmin()
415         collReqs := s.stub.serveFooBarFileCollections()
416         s.stub.serveKeepServices(stubServices)
417         s.stub.serveKeepstoreMounts()
418         s.stub.serveKeepstoreIndexFoo4Bar1()
419         trashReqs := s.stub.serveKeepstoreTrash()
420         pullReqs := s.stub.serveKeepstorePull()
421         srv, err := NewServer(s.config, opts)
422         c.Assert(err, check.IsNil)
423         bal, err := srv.Run()
424         c.Check(err, check.IsNil)
425         for _, req := range collReqs.reqs {
426                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
427         }
428         c.Check(trashReqs.Count(), check.Equals, 0)
429         c.Check(pullReqs.Count(), check.Equals, 0)
430         c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
431         c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
432         c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
433 }
434
435 func (s *runSuite) TestCommit(c *check.C) {
436         s.config.Listen = ":"
437         s.config.ManagementToken = "xyzzy"
438         opts := RunOptions{
439                 CommitPulls: true,
440                 CommitTrash: true,
441                 Logger:      s.logger(c),
442                 Dumper:      s.logger(c),
443         }
444         s.stub.serveCurrentUserAdmin()
445         s.stub.serveFooBarFileCollections()
446         s.stub.serveKeepServices(stubServices)
447         s.stub.serveKeepstoreMounts()
448         s.stub.serveKeepstoreIndexFoo4Bar1()
449         trashReqs := s.stub.serveKeepstoreTrash()
450         pullReqs := s.stub.serveKeepstorePull()
451         srv, err := NewServer(s.config, opts)
452         c.Assert(err, check.IsNil)
453         bal, err := srv.Run()
454         c.Check(err, check.IsNil)
455         c.Check(trashReqs.Count(), check.Equals, 8)
456         c.Check(pullReqs.Count(), check.Equals, 4)
457         // "foo" block is overreplicated by 2
458         c.Check(bal.stats.trashes, check.Equals, 2)
459         // "bar" block is underreplicated by 1, and its only copy is
460         // in a poor rendezvous position
461         c.Check(bal.stats.pulls, check.Equals, 2)
462
463         metrics := s.getMetrics(c, srv)
464         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
465         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
466         c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
467         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
468         c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
469 }
470
471 func (s *runSuite) TestRunForever(c *check.C) {
472         s.config.Listen = ":"
473         s.config.ManagementToken = "xyzzy"
474         opts := RunOptions{
475                 CommitPulls: true,
476                 CommitTrash: true,
477                 Logger:      s.logger(c),
478                 Dumper:      s.logger(c),
479         }
480         s.stub.serveCurrentUserAdmin()
481         s.stub.serveFooBarFileCollections()
482         s.stub.serveKeepServices(stubServices)
483         s.stub.serveKeepstoreMounts()
484         s.stub.serveKeepstoreIndexFoo4Bar1()
485         trashReqs := s.stub.serveKeepstoreTrash()
486         pullReqs := s.stub.serveKeepstorePull()
487
488         stop := make(chan interface{})
489         s.config.RunPeriod = arvados.Duration(time.Millisecond)
490         srv, err := NewServer(s.config, opts)
491         c.Assert(err, check.IsNil)
492
493         done := make(chan bool)
494         go func() {
495                 srv.RunForever(stop)
496                 close(done)
497         }()
498
499         // Each run should send 4 pull lists + 4 trash lists. The
500         // first run should also send 4 empty trash lists at
501         // startup. We should complete all four runs in much less than
502         // a second.
503         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
504                 time.Sleep(time.Millisecond)
505         }
506         stop <- true
507         <-done
508         c.Check(pullReqs.Count() >= 16, check.Equals, true)
509         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
510         c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
511 }
512
513 func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
514         resp, err := http.Get("http://" + srv.listening + "/metrics")
515         c.Assert(err, check.IsNil)
516         c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
517
518         resp, err = http.Get("http://" + srv.listening + "/metrics?api_token=xyzzy")
519         c.Assert(err, check.IsNil)
520         c.Check(resp.StatusCode, check.Equals, http.StatusOK)
521         buf, err := ioutil.ReadAll(resp.Body)
522         c.Check(err, check.IsNil)
523         return string(buf)
524 }