Merge branch '10808-file-cache-ownership'
[arvados.git] / services / keep-balance / balance_run_test.go
1 package main
2
3 import (
4         _ "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "net/http"
10         "net/http/httptest"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16
17         check "gopkg.in/check.v1"
18 )
19
20 var _ = check.Suite(&runSuite{})
21
22 type reqTracker struct {
23         reqs []http.Request
24         sync.Mutex
25 }
26
27 func (rt *reqTracker) Count() int {
28         rt.Lock()
29         defer rt.Unlock()
30         return len(rt.reqs)
31 }
32
33 func (rt *reqTracker) Add(req *http.Request) int {
34         rt.Lock()
35         defer rt.Unlock()
36         rt.reqs = append(rt.reqs, *req)
37         return len(rt.reqs)
38 }
39
40 // stubServer is an HTTP transport that intercepts and processes all
41 // requests using its own handlers.
42 type stubServer struct {
43         mux      *http.ServeMux
44         srv      *httptest.Server
45         mutex    sync.Mutex
46         Requests reqTracker
47         logf     func(string, ...interface{})
48 }
49
50 // Start initializes the stub server and returns an *http.Client that
51 // uses the stub server to handle all requests.
52 //
53 // A stubServer that has been started should eventually be shut down
54 // with Close().
55 func (s *stubServer) Start() *http.Client {
56         // Set up a config.Client that forwards all requests to s.mux
57         // via s.srv. Test cases will attach handlers to s.mux to get
58         // the desired responses.
59         s.mux = http.NewServeMux()
60         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
61                 s.mutex.Lock()
62                 s.Requests.Add(r)
63                 s.mutex.Unlock()
64                 w.Header().Set("Content-Type", "application/json")
65                 s.mux.ServeHTTP(w, r)
66         }))
67         return &http.Client{Transport: s}
68 }
69
70 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
71         w := httptest.NewRecorder()
72         s.mux.ServeHTTP(w, req)
73         return &http.Response{
74                 StatusCode: w.Code,
75                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
76                 Header:     w.HeaderMap,
77                 Body:       ioutil.NopCloser(w.Body)}, nil
78 }
79
80 // Close releases resources used by the server.
81 func (s *stubServer) Close() {
82         s.srv.Close()
83 }
84
85 func (s *stubServer) serveStatic(path, data string) *reqTracker {
86         rt := &reqTracker{}
87         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
88                 rt.Add(r)
89                 if r.Body != nil {
90                         ioutil.ReadAll(r.Body)
91                         r.Body.Close()
92                 }
93                 io.WriteString(w, data)
94         })
95         return rt
96 }
97
98 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
99         return s.serveStatic("/arvados/v1/users/current",
100                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
101 }
102
103 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
104         return s.serveStatic("/arvados/v1/users/current",
105                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
106 }
107
108 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
109         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
110                 `{"defaultCollectionReplication":2}`)
111 }
112
113 func (s *stubServer) serveZeroCollections() *reqTracker {
114         return s.serveStatic("/arvados/v1/collections",
115                 `{"items":[],"items_available":0}`)
116 }
117
118 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
119         rt := &reqTracker{}
120         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
121                 r.ParseForm()
122                 rt.Add(r)
123                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
124                         io.WriteString(w, `{"items_available":0,"items":[]}`)
125                 } else {
126                         io.WriteString(w, `{"items_available":2,"items":[
127                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
128                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
129                 }
130         })
131         return rt
132 }
133
134 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
135         rt := &reqTracker{}
136         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
137                 r.ParseForm()
138                 rt.Add(r)
139                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
140                         io.WriteString(w, `{"items_available":3,"items":[]}`)
141                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
142                         io.WriteString(w, `{"items_available":0,"items":[]}`)
143                 } else {
144                         io.WriteString(w, `{"items_available":2,"items":[
145                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
146                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
147                 }
148         })
149         return rt
150 }
151
152 func (s *stubServer) serveZeroKeepServices() *reqTracker {
153         return s.serveStatic("/arvados/v1/keep_services",
154                 `{"items":[],"items_available":0}`)
155 }
156
157 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
158         return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
159                 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
160                 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
161                 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
162                 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
163                 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
164 }
165
166 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
167         rt := &reqTracker{}
168         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
169                 count := rt.Add(r)
170                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
171                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
172                 }
173                 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
174         })
175         return rt
176 }
177
178 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
179         return s.serveStatic("/trash", `{}`)
180 }
181
182 func (s *stubServer) serveKeepstorePull() *reqTracker {
183         return s.serveStatic("/pull", `{}`)
184 }
185
186 type runSuite struct {
187         stub   stubServer
188         config Config
189 }
190
191 // make a log.Logger that writes to the current test's c.Log().
192 func (s *runSuite) logger(c *check.C) *log.Logger {
193         r, w := io.Pipe()
194         go func() {
195                 buf := make([]byte, 10000)
196                 for {
197                         n, err := r.Read(buf)
198                         if n > 0 {
199                                 if buf[n-1] == '\n' {
200                                         n--
201                                 }
202                                 c.Log(string(buf[:n]))
203                         }
204                         if err != nil {
205                                 break
206                         }
207                 }
208         }()
209         return log.New(w, "", log.LstdFlags)
210 }
211
212 func (s *runSuite) SetUpTest(c *check.C) {
213         s.config = Config{
214                 Client: arvados.Client{
215                         AuthToken: "xyzzy",
216                         APIHost:   "zzzzz.arvadosapi.com",
217                         Client:    s.stub.Start()},
218                 KeepServiceTypes: []string{"disk"}}
219         s.stub.serveDiscoveryDoc()
220         s.stub.logf = c.Logf
221 }
222
223 func (s *runSuite) TearDownTest(c *check.C) {
224         s.stub.Close()
225 }
226
227 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
228         opts := RunOptions{
229                 CommitPulls: true,
230                 CommitTrash: true,
231                 Logger:      s.logger(c),
232         }
233         s.stub.serveCurrentUserAdmin()
234         s.stub.serveZeroCollections()
235         s.stub.serveFourDiskKeepServices()
236         s.stub.serveKeepstoreIndexFoo4Bar1()
237         trashReqs := s.stub.serveKeepstoreTrash()
238         pullReqs := s.stub.serveKeepstorePull()
239         _, err := (&Balancer{}).Run(s.config, opts)
240         c.Check(err, check.ErrorMatches, "received zero collections")
241         c.Check(trashReqs.Count(), check.Equals, 4)
242         c.Check(pullReqs.Count(), check.Equals, 0)
243 }
244
245 func (s *runSuite) TestServiceTypes(c *check.C) {
246         opts := RunOptions{
247                 CommitPulls: true,
248                 CommitTrash: true,
249                 Logger:      s.logger(c),
250         }
251         s.config.KeepServiceTypes = []string{"unlisted-type"}
252         s.stub.serveCurrentUserAdmin()
253         s.stub.serveFooBarFileCollections()
254         s.stub.serveFourDiskKeepServices()
255         indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
256         trashReqs := s.stub.serveKeepstoreTrash()
257         _, err := (&Balancer{}).Run(s.config, opts)
258         c.Check(err, check.IsNil)
259         c.Check(indexReqs.Count(), check.Equals, 0)
260         c.Check(trashReqs.Count(), check.Equals, 0)
261 }
262
263 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
264         opts := RunOptions{
265                 CommitPulls: true,
266                 CommitTrash: true,
267                 Logger:      s.logger(c),
268         }
269         s.stub.serveCurrentUserNotAdmin()
270         s.stub.serveZeroCollections()
271         s.stub.serveFourDiskKeepServices()
272         trashReqs := s.stub.serveKeepstoreTrash()
273         pullReqs := s.stub.serveKeepstorePull()
274         _, err := (&Balancer{}).Run(s.config, opts)
275         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
276         c.Check(trashReqs.Count(), check.Equals, 0)
277         c.Check(pullReqs.Count(), check.Equals, 0)
278 }
279
280 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
281         opts := RunOptions{
282                 CommitPulls: true,
283                 CommitTrash: true,
284                 Logger:      s.logger(c),
285         }
286         s.stub.serveCurrentUserAdmin()
287         s.stub.serveCollectionsButSkipOne()
288         s.stub.serveFourDiskKeepServices()
289         s.stub.serveKeepstoreIndexFoo4Bar1()
290         trashReqs := s.stub.serveKeepstoreTrash()
291         pullReqs := s.stub.serveKeepstorePull()
292         _, err := (&Balancer{}).Run(s.config, opts)
293         c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
294         c.Check(trashReqs.Count(), check.Equals, 4)
295         c.Check(pullReqs.Count(), check.Equals, 0)
296 }
297
298 func (s *runSuite) TestDryRun(c *check.C) {
299         opts := RunOptions{
300                 CommitPulls: false,
301                 CommitTrash: false,
302                 Logger:      s.logger(c),
303         }
304         s.stub.serveCurrentUserAdmin()
305         collReqs := s.stub.serveFooBarFileCollections()
306         s.stub.serveFourDiskKeepServices()
307         s.stub.serveKeepstoreIndexFoo4Bar1()
308         trashReqs := s.stub.serveKeepstoreTrash()
309         pullReqs := s.stub.serveKeepstorePull()
310         var bal Balancer
311         _, err := bal.Run(s.config, opts)
312         c.Check(err, check.IsNil)
313         for _, req := range collReqs.reqs {
314                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
315         }
316         c.Check(trashReqs.Count(), check.Equals, 0)
317         c.Check(pullReqs.Count(), check.Equals, 0)
318         stats := bal.getStatistics()
319         c.Check(stats.pulls, check.Not(check.Equals), 0)
320         c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
321         c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
322 }
323
324 func (s *runSuite) TestCommit(c *check.C) {
325         opts := RunOptions{
326                 CommitPulls: true,
327                 CommitTrash: true,
328                 Logger:      s.logger(c),
329                 Dumper:      s.logger(c),
330         }
331         s.stub.serveCurrentUserAdmin()
332         s.stub.serveFooBarFileCollections()
333         s.stub.serveFourDiskKeepServices()
334         s.stub.serveKeepstoreIndexFoo4Bar1()
335         trashReqs := s.stub.serveKeepstoreTrash()
336         pullReqs := s.stub.serveKeepstorePull()
337         var bal Balancer
338         _, err := bal.Run(s.config, opts)
339         c.Check(err, check.IsNil)
340         c.Check(trashReqs.Count(), check.Equals, 8)
341         c.Check(pullReqs.Count(), check.Equals, 4)
342         stats := bal.getStatistics()
343         // "foo" block is overreplicated by 2
344         c.Check(stats.trashes, check.Equals, 2)
345         // "bar" block is underreplicated by 1, and its only copy is
346         // in a poor rendezvous position
347         c.Check(stats.pulls, check.Equals, 2)
348 }
349
350 func (s *runSuite) TestRunForever(c *check.C) {
351         opts := RunOptions{
352                 CommitPulls: true,
353                 CommitTrash: true,
354                 Logger:      s.logger(c),
355                 Dumper:      s.logger(c),
356         }
357         s.stub.serveCurrentUserAdmin()
358         s.stub.serveFooBarFileCollections()
359         s.stub.serveFourDiskKeepServices()
360         s.stub.serveKeepstoreIndexFoo4Bar1()
361         trashReqs := s.stub.serveKeepstoreTrash()
362         pullReqs := s.stub.serveKeepstorePull()
363
364         stop := make(chan interface{})
365         s.config.RunPeriod = arvados.Duration(time.Millisecond)
366         go RunForever(s.config, opts, stop)
367
368         // Each run should send 4 pull lists + 4 trash lists. The
369         // first run should also send 4 empty trash lists at
370         // startup. We should complete all four runs in much less than
371         // a second.
372         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
373                 time.Sleep(time.Millisecond)
374         }
375         stop <- true
376         c.Check(pullReqs.Count() >= 16, check.Equals, true)
377         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
378 }