15 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 check "gopkg.in/check.v1"
20 var _ = check.Suite(&runSuite{})
22 type reqTracker struct {
27 func (rt *reqTracker) Count() int {
33 func (rt *reqTracker) Add(req *http.Request) int {
36 rt.reqs = append(rt.reqs, *req)
40 // stubServer is an HTTP transport that intercepts and processes all
41 // requests using its own handlers.
42 type stubServer struct {
47 logf func(string, ...interface{})
50 // Start initializes the stub server and returns an *http.Client that
51 // uses the stub server to handle all requests.
53 // A stubServer that has been started should eventually be shut down
55 func (s *stubServer) Start() *http.Client {
56 // Set up a config.Client that forwards all requests to s.mux
57 // via s.srv. Test cases will attach handlers to s.mux to get
58 // the desired responses.
59 s.mux = http.NewServeMux()
60 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
64 w.Header().Set("Content-Type", "application/json")
67 return &http.Client{Transport: s}
70 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
71 w := httptest.NewRecorder()
72 s.mux.ServeHTTP(w, req)
73 return &http.Response{
75 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
77 Body: ioutil.NopCloser(w.Body)}, nil
80 // Close releases resources used by the server.
81 func (s *stubServer) Close() {
85 func (s *stubServer) serveStatic(path, data string) *reqTracker {
87 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
90 ioutil.ReadAll(r.Body)
93 io.WriteString(w, data)
98 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
99 return s.serveStatic("/arvados/v1/users/current",
100 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
103 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
104 return s.serveStatic("/arvados/v1/users/current",
105 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
108 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
109 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
110 `{"defaultCollectionReplication":2}`)
113 func (s *stubServer) serveZeroCollections() *reqTracker {
114 return s.serveStatic("/arvados/v1/collections",
115 `{"items":[],"items_available":0}`)
118 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
120 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
123 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
124 io.WriteString(w, `{"items_available":0,"items":[]}`)
126 io.WriteString(w, `{"items_available":2,"items":[
127 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
128 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
134 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
136 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
139 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
140 io.WriteString(w, `{"items_available":3,"items":[]}`)
141 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
142 io.WriteString(w, `{"items_available":0,"items":[]}`)
144 io.WriteString(w, `{"items_available":2,"items":[
145 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
146 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
152 func (s *stubServer) serveZeroKeepServices() *reqTracker {
153 return s.serveStatic("/arvados/v1/keep_services",
154 `{"items":[],"items_available":0}`)
157 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
158 return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
159 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
160 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
161 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
162 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
163 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
166 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
168 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
170 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
171 io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
173 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
178 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
179 return s.serveStatic("/trash", `{}`)
182 func (s *stubServer) serveKeepstorePull() *reqTracker {
183 return s.serveStatic("/pull", `{}`)
186 type runSuite struct {
191 // make a log.Logger that writes to the current test's c.Log().
192 func (s *runSuite) logger(c *check.C) *log.Logger {
195 buf := make([]byte, 10000)
197 n, err := r.Read(buf)
199 if buf[n-1] == '\n' {
202 c.Log(string(buf[:n]))
209 return log.New(w, "", log.LstdFlags)
212 func (s *runSuite) SetUpTest(c *check.C) {
214 Client: arvados.Client{
216 APIHost: "zzzzz.arvadosapi.com",
217 Client: s.stub.Start()},
218 KeepServiceTypes: []string{"disk"}}
219 s.stub.serveDiscoveryDoc()
223 func (s *runSuite) TearDownTest(c *check.C) {
227 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
233 s.stub.serveCurrentUserAdmin()
234 s.stub.serveZeroCollections()
235 s.stub.serveFourDiskKeepServices()
236 s.stub.serveKeepstoreIndexFoo4Bar1()
237 trashReqs := s.stub.serveKeepstoreTrash()
238 pullReqs := s.stub.serveKeepstorePull()
239 _, err := (&Balancer{}).Run(s.config, opts)
240 c.Check(err, check.ErrorMatches, "received zero collections")
241 c.Check(trashReqs.Count(), check.Equals, 4)
242 c.Check(pullReqs.Count(), check.Equals, 0)
245 func (s *runSuite) TestServiceTypes(c *check.C) {
251 s.config.KeepServiceTypes = []string{"unlisted-type"}
252 s.stub.serveCurrentUserAdmin()
253 s.stub.serveFooBarFileCollections()
254 s.stub.serveFourDiskKeepServices()
255 indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
256 trashReqs := s.stub.serveKeepstoreTrash()
257 _, err := (&Balancer{}).Run(s.config, opts)
258 c.Check(err, check.IsNil)
259 c.Check(indexReqs.Count(), check.Equals, 0)
260 c.Check(trashReqs.Count(), check.Equals, 0)
263 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
269 s.stub.serveCurrentUserNotAdmin()
270 s.stub.serveZeroCollections()
271 s.stub.serveFourDiskKeepServices()
272 trashReqs := s.stub.serveKeepstoreTrash()
273 pullReqs := s.stub.serveKeepstorePull()
274 _, err := (&Balancer{}).Run(s.config, opts)
275 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
276 c.Check(trashReqs.Count(), check.Equals, 0)
277 c.Check(pullReqs.Count(), check.Equals, 0)
280 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
286 s.stub.serveCurrentUserAdmin()
287 s.stub.serveCollectionsButSkipOne()
288 s.stub.serveFourDiskKeepServices()
289 s.stub.serveKeepstoreIndexFoo4Bar1()
290 trashReqs := s.stub.serveKeepstoreTrash()
291 pullReqs := s.stub.serveKeepstorePull()
292 _, err := (&Balancer{}).Run(s.config, opts)
293 c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
294 c.Check(trashReqs.Count(), check.Equals, 4)
295 c.Check(pullReqs.Count(), check.Equals, 0)
298 func (s *runSuite) TestDryRun(c *check.C) {
304 s.stub.serveCurrentUserAdmin()
305 collReqs := s.stub.serveFooBarFileCollections()
306 s.stub.serveFourDiskKeepServices()
307 s.stub.serveKeepstoreIndexFoo4Bar1()
308 trashReqs := s.stub.serveKeepstoreTrash()
309 pullReqs := s.stub.serveKeepstorePull()
311 _, err := bal.Run(s.config, opts)
312 c.Check(err, check.IsNil)
313 for _, req := range collReqs.reqs {
314 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
316 c.Check(trashReqs.Count(), check.Equals, 0)
317 c.Check(pullReqs.Count(), check.Equals, 0)
318 stats := bal.getStatistics()
319 c.Check(stats.pulls, check.Not(check.Equals), 0)
320 c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
321 c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
324 func (s *runSuite) TestCommit(c *check.C) {
331 s.stub.serveCurrentUserAdmin()
332 s.stub.serveFooBarFileCollections()
333 s.stub.serveFourDiskKeepServices()
334 s.stub.serveKeepstoreIndexFoo4Bar1()
335 trashReqs := s.stub.serveKeepstoreTrash()
336 pullReqs := s.stub.serveKeepstorePull()
338 _, err := bal.Run(s.config, opts)
339 c.Check(err, check.IsNil)
340 c.Check(trashReqs.Count(), check.Equals, 8)
341 c.Check(pullReqs.Count(), check.Equals, 4)
342 stats := bal.getStatistics()
343 // "foo" block is overreplicated by 2
344 c.Check(stats.trashes, check.Equals, 2)
345 // "bar" block is underreplicated by 1, and its only copy is
346 // in a poor rendezvous position
347 c.Check(stats.pulls, check.Equals, 2)
350 func (s *runSuite) TestRunForever(c *check.C) {
357 s.stub.serveCurrentUserAdmin()
358 s.stub.serveFooBarFileCollections()
359 s.stub.serveFourDiskKeepServices()
360 s.stub.serveKeepstoreIndexFoo4Bar1()
361 trashReqs := s.stub.serveKeepstoreTrash()
362 pullReqs := s.stub.serveKeepstorePull()
364 stop := make(chan interface{})
365 s.config.RunPeriod = arvados.Duration(time.Millisecond)
366 go RunForever(s.config, opts, stop)
368 // Each run should send 4 pull lists + 4 trash lists. The
369 // first run should also send 4 empty trash lists at
370 // startup. We should complete all four runs in much less than
372 for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
373 time.Sleep(time.Millisecond)
376 c.Check(pullReqs.Count() >= 16, check.Equals, true)
377 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)