12032: Fix compatibility with PostgreSQL 9.4.
[arvados.git] / services / keep-balance / balance_run_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         _ "encoding/json"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "net/http/httptest"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20
21         check "gopkg.in/check.v1"
22 )
23
24 var _ = check.Suite(&runSuite{})
25
26 type reqTracker struct {
27         reqs []http.Request
28         sync.Mutex
29 }
30
31 func (rt *reqTracker) Count() int {
32         rt.Lock()
33         defer rt.Unlock()
34         return len(rt.reqs)
35 }
36
37 func (rt *reqTracker) Add(req *http.Request) int {
38         rt.Lock()
39         defer rt.Unlock()
40         rt.reqs = append(rt.reqs, *req)
41         return len(rt.reqs)
42 }
43
44 // stubServer is an HTTP transport that intercepts and processes all
45 // requests using its own handlers.
46 type stubServer struct {
47         mux      *http.ServeMux
48         srv      *httptest.Server
49         mutex    sync.Mutex
50         Requests reqTracker
51         logf     func(string, ...interface{})
52 }
53
54 // Start initializes the stub server and returns an *http.Client that
55 // uses the stub server to handle all requests.
56 //
57 // A stubServer that has been started should eventually be shut down
58 // with Close().
59 func (s *stubServer) Start() *http.Client {
60         // Set up a config.Client that forwards all requests to s.mux
61         // via s.srv. Test cases will attach handlers to s.mux to get
62         // the desired responses.
63         s.mux = http.NewServeMux()
64         s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
65                 s.mutex.Lock()
66                 s.Requests.Add(r)
67                 s.mutex.Unlock()
68                 w.Header().Set("Content-Type", "application/json")
69                 s.mux.ServeHTTP(w, r)
70         }))
71         return &http.Client{Transport: s}
72 }
73
74 func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
75         w := httptest.NewRecorder()
76         s.mux.ServeHTTP(w, req)
77         return &http.Response{
78                 StatusCode: w.Code,
79                 Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
80                 Header:     w.HeaderMap,
81                 Body:       ioutil.NopCloser(w.Body)}, nil
82 }
83
84 // Close releases resources used by the server.
85 func (s *stubServer) Close() {
86         s.srv.Close()
87 }
88
89 func (s *stubServer) serveStatic(path, data string) *reqTracker {
90         rt := &reqTracker{}
91         s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
92                 rt.Add(r)
93                 if r.Body != nil {
94                         ioutil.ReadAll(r.Body)
95                         r.Body.Close()
96                 }
97                 io.WriteString(w, data)
98         })
99         return rt
100 }
101
102 func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
103         return s.serveStatic("/arvados/v1/users/current",
104                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
105 }
106
107 func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
108         return s.serveStatic("/arvados/v1/users/current",
109                 `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
110 }
111
112 func (s *stubServer) serveDiscoveryDoc() *reqTracker {
113         return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
114                 `{"defaultCollectionReplication":2}`)
115 }
116
117 func (s *stubServer) serveZeroCollections() *reqTracker {
118         return s.serveStatic("/arvados/v1/collections",
119                 `{"items":[],"items_available":0}`)
120 }
121
122 func (s *stubServer) serveFooBarFileCollections() *reqTracker {
123         rt := &reqTracker{}
124         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
125                 r.ParseForm()
126                 rt.Add(r)
127                 if strings.Contains(r.Form.Get("filters"), `modified_at`) {
128                         io.WriteString(w, `{"items_available":0,"items":[]}`)
129                 } else {
130                         io.WriteString(w, `{"items_available":2,"items":[
131                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
132                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
133                 }
134         })
135         return rt
136 }
137
138 func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
139         rt := &reqTracker{}
140         s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
141                 r.ParseForm()
142                 rt.Add(r)
143                 if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
144                         io.WriteString(w, `{"items_available":3,"items":[]}`)
145                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
146                         io.WriteString(w, `{"items_available":0,"items":[]}`)
147                 } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
148                         io.WriteString(w, `{"items_available":0,"items":[]}`)
149                 } else {
150                         io.WriteString(w, `{"items_available":2,"items":[
151                                 {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
152                                 {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
153                 }
154         })
155         return rt
156 }
157
158 func (s *stubServer) serveZeroKeepServices() *reqTracker {
159         return s.serveStatic("/arvados/v1/keep_services",
160                 `{"items":[],"items_available":0}`)
161 }
162
163 func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
164         return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
165                 {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
166                 {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
167                 {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
168                 {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
169                 {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
170 }
171
172 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
173         rt := &reqTracker{}
174         s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
175                 count := rt.Add(r)
176                 if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
177                         io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
178                 }
179                 fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
180         })
181         return rt
182 }
183
184 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
185         return s.serveStatic("/trash", `{}`)
186 }
187
188 func (s *stubServer) serveKeepstorePull() *reqTracker {
189         return s.serveStatic("/pull", `{}`)
190 }
191
192 type runSuite struct {
193         stub   stubServer
194         config Config
195 }
196
197 // make a log.Logger that writes to the current test's c.Log().
198 func (s *runSuite) logger(c *check.C) *log.Logger {
199         r, w := io.Pipe()
200         go func() {
201                 buf := make([]byte, 10000)
202                 for {
203                         n, err := r.Read(buf)
204                         if n > 0 {
205                                 if buf[n-1] == '\n' {
206                                         n--
207                                 }
208                                 c.Log(string(buf[:n]))
209                         }
210                         if err != nil {
211                                 break
212                         }
213                 }
214         }()
215         return log.New(w, "", log.LstdFlags)
216 }
217
218 func (s *runSuite) SetUpTest(c *check.C) {
219         s.config = Config{
220                 Client: arvados.Client{
221                         AuthToken: "xyzzy",
222                         APIHost:   "zzzzz.arvadosapi.com",
223                         Client:    s.stub.Start()},
224                 KeepServiceTypes: []string{"disk"}}
225         s.stub.serveDiscoveryDoc()
226         s.stub.logf = c.Logf
227 }
228
229 func (s *runSuite) TearDownTest(c *check.C) {
230         s.stub.Close()
231 }
232
233 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
234         opts := RunOptions{
235                 CommitPulls: true,
236                 CommitTrash: true,
237                 Logger:      s.logger(c),
238         }
239         s.stub.serveCurrentUserAdmin()
240         s.stub.serveZeroCollections()
241         s.stub.serveFourDiskKeepServices()
242         s.stub.serveKeepstoreIndexFoo4Bar1()
243         trashReqs := s.stub.serveKeepstoreTrash()
244         pullReqs := s.stub.serveKeepstorePull()
245         _, err := (&Balancer{}).Run(s.config, opts)
246         c.Check(err, check.ErrorMatches, "received zero collections")
247         c.Check(trashReqs.Count(), check.Equals, 4)
248         c.Check(pullReqs.Count(), check.Equals, 0)
249 }
250
251 func (s *runSuite) TestServiceTypes(c *check.C) {
252         opts := RunOptions{
253                 CommitPulls: true,
254                 CommitTrash: true,
255                 Logger:      s.logger(c),
256         }
257         s.config.KeepServiceTypes = []string{"unlisted-type"}
258         s.stub.serveCurrentUserAdmin()
259         s.stub.serveFooBarFileCollections()
260         s.stub.serveFourDiskKeepServices()
261         indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
262         trashReqs := s.stub.serveKeepstoreTrash()
263         _, err := (&Balancer{}).Run(s.config, opts)
264         c.Check(err, check.IsNil)
265         c.Check(indexReqs.Count(), check.Equals, 0)
266         c.Check(trashReqs.Count(), check.Equals, 0)
267 }
268
269 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
270         opts := RunOptions{
271                 CommitPulls: true,
272                 CommitTrash: true,
273                 Logger:      s.logger(c),
274         }
275         s.stub.serveCurrentUserNotAdmin()
276         s.stub.serveZeroCollections()
277         s.stub.serveFourDiskKeepServices()
278         trashReqs := s.stub.serveKeepstoreTrash()
279         pullReqs := s.stub.serveKeepstorePull()
280         _, err := (&Balancer{}).Run(s.config, opts)
281         c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
282         c.Check(trashReqs.Count(), check.Equals, 0)
283         c.Check(pullReqs.Count(), check.Equals, 0)
284 }
285
286 func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
287         opts := RunOptions{
288                 CommitPulls: true,
289                 CommitTrash: true,
290                 Logger:      s.logger(c),
291         }
292         s.stub.serveCurrentUserAdmin()
293         s.stub.serveCollectionsButSkipOne()
294         s.stub.serveFourDiskKeepServices()
295         s.stub.serveKeepstoreIndexFoo4Bar1()
296         trashReqs := s.stub.serveKeepstoreTrash()
297         pullReqs := s.stub.serveKeepstorePull()
298         _, err := (&Balancer{}).Run(s.config, opts)
299         c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
300         c.Check(trashReqs.Count(), check.Equals, 4)
301         c.Check(pullReqs.Count(), check.Equals, 0)
302 }
303
304 func (s *runSuite) TestDryRun(c *check.C) {
305         opts := RunOptions{
306                 CommitPulls: false,
307                 CommitTrash: false,
308                 Logger:      s.logger(c),
309         }
310         s.stub.serveCurrentUserAdmin()
311         collReqs := s.stub.serveFooBarFileCollections()
312         s.stub.serveFourDiskKeepServices()
313         s.stub.serveKeepstoreIndexFoo4Bar1()
314         trashReqs := s.stub.serveKeepstoreTrash()
315         pullReqs := s.stub.serveKeepstorePull()
316         var bal Balancer
317         _, err := bal.Run(s.config, opts)
318         c.Check(err, check.IsNil)
319         for _, req := range collReqs.reqs {
320                 c.Check(req.Form.Get("include_trash"), check.Equals, "true")
321         }
322         c.Check(trashReqs.Count(), check.Equals, 0)
323         c.Check(pullReqs.Count(), check.Equals, 0)
324         stats := bal.getStatistics()
325         c.Check(stats.pulls, check.Not(check.Equals), 0)
326         c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
327         c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
328 }
329
330 func (s *runSuite) TestCommit(c *check.C) {
331         opts := RunOptions{
332                 CommitPulls: true,
333                 CommitTrash: true,
334                 Logger:      s.logger(c),
335                 Dumper:      s.logger(c),
336         }
337         s.stub.serveCurrentUserAdmin()
338         s.stub.serveFooBarFileCollections()
339         s.stub.serveFourDiskKeepServices()
340         s.stub.serveKeepstoreIndexFoo4Bar1()
341         trashReqs := s.stub.serveKeepstoreTrash()
342         pullReqs := s.stub.serveKeepstorePull()
343         var bal Balancer
344         _, err := bal.Run(s.config, opts)
345         c.Check(err, check.IsNil)
346         c.Check(trashReqs.Count(), check.Equals, 8)
347         c.Check(pullReqs.Count(), check.Equals, 4)
348         stats := bal.getStatistics()
349         // "foo" block is overreplicated by 2
350         c.Check(stats.trashes, check.Equals, 2)
351         // "bar" block is underreplicated by 1, and its only copy is
352         // in a poor rendezvous position
353         c.Check(stats.pulls, check.Equals, 2)
354 }
355
356 func (s *runSuite) TestRunForever(c *check.C) {
357         opts := RunOptions{
358                 CommitPulls: true,
359                 CommitTrash: true,
360                 Logger:      s.logger(c),
361                 Dumper:      s.logger(c),
362         }
363         s.stub.serveCurrentUserAdmin()
364         s.stub.serveFooBarFileCollections()
365         s.stub.serveFourDiskKeepServices()
366         s.stub.serveKeepstoreIndexFoo4Bar1()
367         trashReqs := s.stub.serveKeepstoreTrash()
368         pullReqs := s.stub.serveKeepstorePull()
369
370         stop := make(chan interface{})
371         s.config.RunPeriod = arvados.Duration(time.Millisecond)
372         go RunForever(s.config, opts, stop)
373
374         // Each run should send 4 pull lists + 4 trash lists. The
375         // first run should also send 4 empty trash lists at
376         // startup. We should complete all four runs in much less than
377         // a second.
378         for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
379                 time.Sleep(time.Millisecond)
380         }
381         stop <- true
382         c.Check(pullReqs.Count() >= 16, check.Equals, true)
383         c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
384 }