Merge branch 'master' into 9397-prepopulate-output-directory
[arvados.git] / services / keep-balance / balance_run_test.go
1 package main
2
3 import (
4         _ "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "net/http"
10         "net/http/httptest"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16
17         check "gopkg.in/check.v1"
18 )
19
20 var _ = check.Suite(&runSuite{})
21
22 type reqTracker struct {
23         reqs []http.Request
24         sync.Mutex
25 }
26
27 func (rt *reqTracker) Count() int {
28         rt.Lock()
29         defer rt.Unlock()
30         return len(rt.reqs)
31 }
32
33 func (rt *reqTracker) Add(req *http.Request) int {
34         rt.Lock()
35         defer rt.Unlock()
36         rt.reqs = append(rt.reqs, *req)
37         return len(rt.reqs)
38 }
39
40 // stubServer is an HTTP transport that intercepts and processes all
41 // requests using its own handlers.
42 type stubServer struct {
43         mux      *http.ServeMux
44         srv      *httptest.Server
45         mutex    sync.Mutex
46         Requests reqTracker
47         logf     func(string, ...interface{})
48 }
49
50 // Start initializes the stub server and returns an *http.Client that
51 // uses the stub server to handle all requests.
52 //
53 // A stubServer that has been started should eventually be shut down
54 // with Close().
55 func (s *stubServer) Start() *http.Client {
56         // Set up a config.Client that forwards all requests to s.mux
57         // via s.srv. Test cases will attach handlers to s.mux to get
58         // the desired responses.
59         s.mux = http.NewServeMux()
60         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
61                 s.mutex.Lock()
62                 s.Requests.Add(r)
63                 s.mutex.Unlock()
64                 w.Header().Set("Content-Type", "application/json")
65                 s.mux.ServeHTTP(w, r)
66         }))
67         return &http.Client{Transport: s}
68 }
69
70 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
71         w := httptest.NewRecorder()
72         s.mux.ServeHTTP(w, req)
73         return &http.Response{
74                 StatusCode: w.Code,
75                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
76                 Header:     w.HeaderMap,
77                 Body:       ioutil.NopCloser(w.Body)}, nil
78 }
79
80 // Close releases resources used by the server.
81 func (s *stubServer) Close() {
82         s.srv.Close()
83 }
84
85 func (s *stubServer) serveStatic(path, data string) *reqTracker {
86         rt := &reqTracker{}
87         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
88                 rt.Add(r)
89                 if r.Body != nil {
90                         ioutil.ReadAll(r.Body)
91                         r.Body.Close()
92                 }
93                 io.WriteString(w, data)
94         })
95         return rt
96 }
97
98 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
99         return s.serveStatic("/arvados/v1/users/current",
100                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
101 }
102
103 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
104         return s.serveStatic("/arvados/v1/users/current",
105                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
106 }
107
108 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
109         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
110                 `{"defaultCollectionReplication":2}`)
111 }
112
113 func (s *stubServer) serveZeroCollections() *reqTracker {
114         return s.serveStatic("/arvados/v1/collections",
115                 `{"items":[],"items_available":0}`)
116 }
117
118 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
119         rt := &reqTracker{}
120         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
121                 r.ParseForm()
122                 rt.Add(r)
123                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
124                         io.WriteString(w, `{"items_available":0,"items":[]}`)
125                 } else {
126                         io.WriteString(w, `{"items_available":2,"items":[
127                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
128                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
129                 }
130         })
131         return rt
132 }
133
134 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
135         rt := &reqTracker{}
136         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
137                 r.ParseForm()
138                 rt.Add(r)
139                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
140                         io.WriteString(w, `{"items_available":3,"items":[]}`)
141                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
142                         io.WriteString(w, `{"items_available":0,"items":[]}`)
143                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
144                         io.WriteString(w, `{"items_available":0,"items":[]}`)
145                 } else {
146                         io.WriteString(w, `{"items_available":2,"items":[
147                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
148                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
149                 }
150         })
151         return rt
152 }
153
154 func (s *stubServer) serveZeroKeepServices() *reqTracker {
155         return s.serveStatic("/arvados/v1/keep_services",
156                 `{"items":[],"items_available":0}`)
157 }
158
159 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
160         return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
161                 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
162                 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
163                 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
164                 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
165                 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
166 }
167
168 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
169         rt := &reqTracker{}
170         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
171                 count := rt.Add(r)
172                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
173                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
174                 }
175                 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
176         })
177         return rt
178 }
179
180 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
181         return s.serveStatic("/trash", `{}`)
182 }
183
184 func (s *stubServer) serveKeepstorePull() *reqTracker {
185         return s.serveStatic("/pull", `{}`)
186 }
187
188 type runSuite struct {
189         stub   stubServer
190         config Config
191 }
192
193 // make a log.Logger that writes to the current test's c.Log().
194 func (s *runSuite) logger(c *check.C) *log.Logger {
195         r, w := io.Pipe()
196         go func() {
197                 buf := make([]byte, 10000)
198                 for {
199                         n, err := r.Read(buf)
200                         if n > 0 {
201                                 if buf[n-1] == '\n' {
202                                         n--
203                                 }
204                                 c.Log(string(buf[:n]))
205                         }
206                         if err != nil {
207                                 break
208                         }
209                 }
210         }()
211         return log.New(w, "", log.LstdFlags)
212 }
213
214 func (s *runSuite) SetUpTest(c *check.C) {
215         s.config = Config{
216                 Client: arvados.Client{
217                         AuthToken: "xyzzy",
218                         APIHost:   "zzzzz.arvadosapi.com",
219                         Client:    s.stub.Start()},
220                 KeepServiceTypes: []string{"disk"}}
221         s.stub.serveDiscoveryDoc()
222         s.stub.logf = c.Logf
223 }
224
225 func (s *runSuite) TearDownTest(c *check.C) {
226         s.stub.Close()
227 }
228
229 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
230         opts := RunOptions{
231                 CommitPulls: true,
232                 CommitTrash: true,
233                 Logger:      s.logger(c),
234         }
235         s.stub.serveCurrentUserAdmin()
236         s.stub.serveZeroCollections()
237         s.stub.serveFourDiskKeepServices()
238         s.stub.serveKeepstoreIndexFoo4Bar1()
239         trashReqs := s.stub.serveKeepstoreTrash()
240         pullReqs := s.stub.serveKeepstorePull()
241         _, err := (&Balancer{}).Run(s.config, opts)
242         c.Check(err, check.ErrorMatches, "received zero collections")
243         c.Check(trashReqs.Count(), check.Equals, 4)
244         c.Check(pullReqs.Count(), check.Equals, 0)
245 }
246
247 func (s *runSuite) TestServiceTypes(c *check.C) {
248         opts := RunOptions{
249                 CommitPulls: true,
250                 CommitTrash: true,
251                 Logger:      s.logger(c),
252         }
253         s.config.KeepServiceTypes = []string{"unlisted-type"}
254         s.stub.serveCurrentUserAdmin()
255         s.stub.serveFooBarFileCollections()
256         s.stub.serveFourDiskKeepServices()
257         indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
258         trashReqs := s.stub.serveKeepstoreTrash()
259         _, err := (&Balancer{}).Run(s.config, opts)
260         c.Check(err, check.IsNil)
261         c.Check(indexReqs.Count(), check.Equals, 0)
262         c.Check(trashReqs.Count(), check.Equals, 0)
263 }
264
265 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
266         opts := RunOptions{
267                 CommitPulls: true,
268                 CommitTrash: true,
269                 Logger:      s.logger(c),
270         }
271         s.stub.serveCurrentUserNotAdmin()
272         s.stub.serveZeroCollections()
273         s.stub.serveFourDiskKeepServices()
274         trashReqs := s.stub.serveKeepstoreTrash()
275         pullReqs := s.stub.serveKeepstorePull()
276         _, err := (&Balancer{}).Run(s.config, opts)
277         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
278         c.Check(trashReqs.Count(), check.Equals, 0)
279         c.Check(pullReqs.Count(), check.Equals, 0)
280 }
281
282 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
283         opts := RunOptions{
284                 CommitPulls: true,
285                 CommitTrash: true,
286                 Logger:      s.logger(c),
287         }
288         s.stub.serveCurrentUserAdmin()
289         s.stub.serveCollectionsButSkipOne()
290         s.stub.serveFourDiskKeepServices()
291         s.stub.serveKeepstoreIndexFoo4Bar1()
292         trashReqs := s.stub.serveKeepstoreTrash()
293         pullReqs := s.stub.serveKeepstorePull()
294         _, err := (&Balancer{}).Run(s.config, opts)
295         c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
296         c.Check(trashReqs.Count(), check.Equals, 4)
297         c.Check(pullReqs.Count(), check.Equals, 0)
298 }
299
300 func (s *runSuite) TestDryRun(c *check.C) {
301         opts := RunOptions{
302                 CommitPulls: false,
303                 CommitTrash: false,
304                 Logger:      s.logger(c),
305         }
306         s.stub.serveCurrentUserAdmin()
307         collReqs := s.stub.serveFooBarFileCollections()
308         s.stub.serveFourDiskKeepServices()
309         s.stub.serveKeepstoreIndexFoo4Bar1()
310         trashReqs := s.stub.serveKeepstoreTrash()
311         pullReqs := s.stub.serveKeepstorePull()
312         var bal Balancer
313         _, err := bal.Run(s.config, opts)
314         c.Check(err, check.IsNil)
315         for _, req := range collReqs.reqs {
316                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
317         }
318         c.Check(trashReqs.Count(), check.Equals, 0)
319         c.Check(pullReqs.Count(), check.Equals, 0)
320         stats := bal.getStatistics()
321         c.Check(stats.pulls, check.Not(check.Equals), 0)
322         c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
323         c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
324 }
325
326 func (s *runSuite) TestCommit(c *check.C) {
327         opts := RunOptions{
328                 CommitPulls: true,
329                 CommitTrash: true,
330                 Logger:      s.logger(c),
331                 Dumper:      s.logger(c),
332         }
333         s.stub.serveCurrentUserAdmin()
334         s.stub.serveFooBarFileCollections()
335         s.stub.serveFourDiskKeepServices()
336         s.stub.serveKeepstoreIndexFoo4Bar1()
337         trashReqs := s.stub.serveKeepstoreTrash()
338         pullReqs := s.stub.serveKeepstorePull()
339         var bal Balancer
340         _, err := bal.Run(s.config, opts)
341         c.Check(err, check.IsNil)
342         c.Check(trashReqs.Count(), check.Equals, 8)
343         c.Check(pullReqs.Count(), check.Equals, 4)
344         stats := bal.getStatistics()
345         // "foo" block is overreplicated by 2
346         c.Check(stats.trashes, check.Equals, 2)
347         // "bar" block is underreplicated by 1, and its only copy is
348         // in a poor rendezvous position
349         c.Check(stats.pulls, check.Equals, 2)
350 }
351
352 func (s *runSuite) TestRunForever(c *check.C) {
353         opts := RunOptions{
354                 CommitPulls: true,
355                 CommitTrash: true,
356                 Logger:      s.logger(c),
357                 Dumper:      s.logger(c),
358         }
359         s.stub.serveCurrentUserAdmin()
360         s.stub.serveFooBarFileCollections()
361         s.stub.serveFourDiskKeepServices()
362         s.stub.serveKeepstoreIndexFoo4Bar1()
363         trashReqs := s.stub.serveKeepstoreTrash()
364         pullReqs := s.stub.serveKeepstorePull()
365
366         stop := make(chan interface{})
367         s.config.RunPeriod = arvados.Duration(time.Millisecond)
368         go RunForever(s.config, opts, stop)
369
370         // Each run should send 4 pull lists + 4 trash lists. The
371         // first run should also send 4 empty trash lists at
372         // startup. We should complete all four runs in much less than
373         // a second.
374         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
375                 time.Sleep(time.Millisecond)
376         }
377         stop <- true
378         c.Check(pullReqs.Count() >= 16, check.Equals, true)
379         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
380 }