15 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 check "gopkg.in/check.v1"
20 var _ = check.Suite(&runSuite{})
22 type reqTracker struct {
27 func (rt *reqTracker) Count() int {
33 func (rt *reqTracker) Add(req *http.Request) int {
36 rt.reqs = append(rt.reqs, *req)
40 // stubServer is an HTTP transport that intercepts and processes all
41 // requests using its own handlers.
42 type stubServer struct {
47 logf func(string, ...interface{})
50 // Start initializes the stub server and returns an *http.Client that
51 // uses the stub server to handle all requests.
53 // A stubServer that has been started should eventually be shut down
55 func (s *stubServer) Start() *http.Client {
56 // Set up a config.Client that forwards all requests to s.mux
57 // via s.srv. Test cases will attach handlers to s.mux to get
58 // the desired responses.
59 s.mux = http.NewServeMux()
60 s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
64 w.Header().Set("Content-Type", "application/json")
67 return &http.Client{Transport: s}
70 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
71 w := httptest.NewRecorder()
72 s.mux.ServeHTTP(w, req)
73 return &http.Response{
75 Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
77 Body: ioutil.NopCloser(w.Body)}, nil
80 // Close releases resources used by the server.
81 func (s *stubServer) Close() {
85 func (s *stubServer) serveStatic(path, data string) *reqTracker {
87 s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
90 ioutil.ReadAll(r.Body)
93 io.WriteString(w, data)
98 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
99 return s.serveStatic("/arvados/v1/users/current",
100 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
103 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
104 return s.serveStatic("/arvados/v1/users/current",
105 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
108 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
109 return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
110 `{"defaultCollectionReplication":2}`)
113 func (s *stubServer) serveZeroCollections() *reqTracker {
114 return s.serveStatic("/arvados/v1/collections",
115 `{"items":[],"items_available":0}`)
118 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
120 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
123 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
124 io.WriteString(w, `{"items_available":0,"items":[]}`)
126 io.WriteString(w, `{"items_available":2,"items":[
127 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
128 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
134 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
136 s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
139 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
140 io.WriteString(w, `{"items_available":3,"items":[]}`)
141 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
142 io.WriteString(w, `{"items_available":0,"items":[]}`)
143 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
144 io.WriteString(w, `{"items_available":0,"items":[]}`)
146 io.WriteString(w, `{"items_available":2,"items":[
147 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
148 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
154 func (s *stubServer) serveZeroKeepServices() *reqTracker {
155 return s.serveStatic("/arvados/v1/keep_services",
156 `{"items":[],"items_available":0}`)
159 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
160 return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
161 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
162 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
163 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
164 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
165 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
168 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
170 s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
172 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
173 io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
175 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
180 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
181 return s.serveStatic("/trash", `{}`)
184 func (s *stubServer) serveKeepstorePull() *reqTracker {
185 return s.serveStatic("/pull", `{}`)
188 type runSuite struct {
193 // make a log.Logger that writes to the current test's c.Log().
194 func (s *runSuite) logger(c *check.C) *log.Logger {
197 buf := make([]byte, 10000)
199 n, err := r.Read(buf)
201 if buf[n-1] == '\n' {
204 c.Log(string(buf[:n]))
211 return log.New(w, "", log.LstdFlags)
214 func (s *runSuite) SetUpTest(c *check.C) {
216 Client: arvados.Client{
218 APIHost: "zzzzz.arvadosapi.com",
219 Client: s.stub.Start()},
220 KeepServiceTypes: []string{"disk"}}
221 s.stub.serveDiscoveryDoc()
225 func (s *runSuite) TearDownTest(c *check.C) {
229 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
235 s.stub.serveCurrentUserAdmin()
236 s.stub.serveZeroCollections()
237 s.stub.serveFourDiskKeepServices()
238 s.stub.serveKeepstoreIndexFoo4Bar1()
239 trashReqs := s.stub.serveKeepstoreTrash()
240 pullReqs := s.stub.serveKeepstorePull()
241 _, err := (&Balancer{}).Run(s.config, opts)
242 c.Check(err, check.ErrorMatches, "received zero collections")
243 c.Check(trashReqs.Count(), check.Equals, 4)
244 c.Check(pullReqs.Count(), check.Equals, 0)
247 func (s *runSuite) TestServiceTypes(c *check.C) {
253 s.config.KeepServiceTypes = []string{"unlisted-type"}
254 s.stub.serveCurrentUserAdmin()
255 s.stub.serveFooBarFileCollections()
256 s.stub.serveFourDiskKeepServices()
257 indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
258 trashReqs := s.stub.serveKeepstoreTrash()
259 _, err := (&Balancer{}).Run(s.config, opts)
260 c.Check(err, check.IsNil)
261 c.Check(indexReqs.Count(), check.Equals, 0)
262 c.Check(trashReqs.Count(), check.Equals, 0)
265 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
271 s.stub.serveCurrentUserNotAdmin()
272 s.stub.serveZeroCollections()
273 s.stub.serveFourDiskKeepServices()
274 trashReqs := s.stub.serveKeepstoreTrash()
275 pullReqs := s.stub.serveKeepstorePull()
276 _, err := (&Balancer{}).Run(s.config, opts)
277 c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
278 c.Check(trashReqs.Count(), check.Equals, 0)
279 c.Check(pullReqs.Count(), check.Equals, 0)
282 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
288 s.stub.serveCurrentUserAdmin()
289 s.stub.serveCollectionsButSkipOne()
290 s.stub.serveFourDiskKeepServices()
291 s.stub.serveKeepstoreIndexFoo4Bar1()
292 trashReqs := s.stub.serveKeepstoreTrash()
293 pullReqs := s.stub.serveKeepstorePull()
294 _, err := (&Balancer{}).Run(s.config, opts)
295 c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
296 c.Check(trashReqs.Count(), check.Equals, 4)
297 c.Check(pullReqs.Count(), check.Equals, 0)
300 func (s *runSuite) TestDryRun(c *check.C) {
306 s.stub.serveCurrentUserAdmin()
307 collReqs := s.stub.serveFooBarFileCollections()
308 s.stub.serveFourDiskKeepServices()
309 s.stub.serveKeepstoreIndexFoo4Bar1()
310 trashReqs := s.stub.serveKeepstoreTrash()
311 pullReqs := s.stub.serveKeepstorePull()
313 _, err := bal.Run(s.config, opts)
314 c.Check(err, check.IsNil)
315 for _, req := range collReqs.reqs {
316 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
318 c.Check(trashReqs.Count(), check.Equals, 0)
319 c.Check(pullReqs.Count(), check.Equals, 0)
320 stats := bal.getStatistics()
321 c.Check(stats.pulls, check.Not(check.Equals), 0)
322 c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
323 c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
326 func (s *runSuite) TestCommit(c *check.C) {
333 s.stub.serveCurrentUserAdmin()
334 s.stub.serveFooBarFileCollections()
335 s.stub.serveFourDiskKeepServices()
336 s.stub.serveKeepstoreIndexFoo4Bar1()
337 trashReqs := s.stub.serveKeepstoreTrash()
338 pullReqs := s.stub.serveKeepstorePull()
340 _, err := bal.Run(s.config, opts)
341 c.Check(err, check.IsNil)
342 c.Check(trashReqs.Count(), check.Equals, 8)
343 c.Check(pullReqs.Count(), check.Equals, 4)
344 stats := bal.getStatistics()
345 // "foo" block is overreplicated by 2
346 c.Check(stats.trashes, check.Equals, 2)
347 // "bar" block is underreplicated by 1, and its only copy is
348 // in a poor rendezvous position
349 c.Check(stats.pulls, check.Equals, 2)
352 func (s *runSuite) TestRunForever(c *check.C) {
359 s.stub.serveCurrentUserAdmin()
360 s.stub.serveFooBarFileCollections()
361 s.stub.serveFourDiskKeepServices()
362 s.stub.serveKeepstoreIndexFoo4Bar1()
363 trashReqs := s.stub.serveKeepstoreTrash()
364 pullReqs := s.stub.serveKeepstorePull()
366 stop := make(chan interface{})
367 s.config.RunPeriod = arvados.Duration(time.Millisecond)
368 go RunForever(s.config, opts, stop)
370 // Each run should send 4 pull lists + 4 trash lists. The
371 // first run should also send 4 empty trash lists at
372 // startup. We should complete all four runs in much less than
374 for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
375 time.Sleep(time.Millisecond)
378 c.Check(pullReqs.Count() >= 16, check.Equals, true)
379 c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)