Merge branch '21578-mount-debug'
[arvados.git] / services / keepstore / router_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "sort"
18         "strings"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/httpserver"
24         "github.com/prometheus/client_golang/prometheus"
25         . "gopkg.in/check.v1"
26 )
27
28 // routerSuite tests that the router correctly translates HTTP
29 // requests to the appropriate keepstore functionality, and translates
30 // the results to HTTP responses.
31 type routerSuite struct {
32         cluster *arvados.Cluster
33 }
34
35 var _ = Suite(&routerSuite{})
36
37 func testRouter(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*router, context.CancelFunc) {
38         if reg == nil {
39                 reg = prometheus.NewRegistry()
40         }
41         ctx, cancel := context.WithCancel(context.Background())
42         ks, kcancel := testKeepstore(t, cluster, reg)
43         go func() {
44                 <-ctx.Done()
45                 kcancel()
46         }()
47         puller := newPuller(ctx, ks, reg)
48         trasher := newTrasher(ctx, ks, reg)
49         return newRouter(ks, puller, trasher).(*router), cancel
50 }
51
52 func (s *routerSuite) SetUpTest(c *C) {
53         s.cluster = testCluster(c)
54         s.cluster.Volumes = map[string]arvados.Volume{
55                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass1": true}},
56                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass2": true}},
57         }
58         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
59                 "testclass1": arvados.StorageClassConfig{
60                         Default: true,
61                 },
62                 "testclass2": arvados.StorageClassConfig{
63                         Default: true,
64                 },
65         }
66 }
67
68 func (s *routerSuite) TestBlockRead_Token(c *C) {
69         router, cancel := testRouter(c, s.cluster, nil)
70         defer cancel()
71
72         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
73         c.Assert(err, IsNil)
74         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
75         c.Assert(locSigned, Not(Equals), fooHash+"+3")
76
77         // No token provided
78         resp := call(router, "GET", "http://example/"+locSigned, "", nil, nil)
79         c.Check(resp.Code, Equals, http.StatusUnauthorized)
80         c.Check(resp.Body.String(), Matches, "no token provided in Authorization header\n")
81
82         // Different token => invalid signature
83         resp = call(router, "GET", "http://example/"+locSigned, "badtoken", nil, nil)
84         c.Check(resp.Code, Equals, http.StatusBadRequest)
85         c.Check(resp.Body.String(), Equals, "invalid signature\n")
86
87         // Correct token
88         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
89         c.Check(resp.Code, Equals, http.StatusOK)
90         c.Check(resp.Body.String(), Equals, "foo")
91
92         // HEAD
93         resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
94         c.Check(resp.Code, Equals, http.StatusOK)
95         c.Check(resp.Result().ContentLength, Equals, int64(3))
96         c.Check(resp.Body.String(), Equals, "")
97 }
98
99 // As a special case we allow HEAD requests that only provide a hash
100 // without a size hint. This accommodates uses of keep-block-check
101 // where it's inconvenient to attach size hints to known hashes.
102 //
103 // GET requests must provide a size hint -- otherwise we can't
104 // propagate a checksum mismatch error.
105 func (s *routerSuite) TestBlockRead_NoSizeHint(c *C) {
106         s.cluster.Collections.BlobSigning = true
107         router, cancel := testRouter(c, s.cluster, nil)
108         defer cancel()
109         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
110         c.Assert(err, IsNil)
111
112         // hash+signature
113         hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash)
114         resp := call(router, "GET", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
115         c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
116
117         resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
118         c.Check(resp.Code, Equals, http.StatusUnauthorized)
119         resp = call(router, "HEAD", "http://example/"+fooHash+"+3", "", nil, nil)
120         c.Check(resp.Code, Equals, http.StatusUnauthorized)
121
122         s.cluster.Collections.BlobSigning = false
123         router, cancel = testRouter(c, s.cluster, nil)
124         defer cancel()
125         err = router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
126         c.Assert(err, IsNil)
127
128         resp = call(router, "GET", "http://example/"+fooHash, "", nil, nil)
129         c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
130
131         resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
132         c.Check(resp.Code, Equals, http.StatusOK)
133         c.Check(resp.Body.String(), Equals, "")
134         c.Check(resp.Result().ContentLength, Equals, int64(3))
135         c.Check(resp.Header().Get("Content-Length"), Equals, "3")
136 }
137
138 // By the time we discover the checksum mismatch, it's too late to
139 // change the response code, but the expected block size is given in
140 // the Content-Length response header, so a generic http client can
141 // detect the problem.
142 func (s *routerSuite) TestBlockRead_ChecksumMismatch(c *C) {
143         router, cancel := testRouter(c, s.cluster, nil)
144         defer cancel()
145
146         gooddata := make([]byte, 10_000_000)
147         gooddata[0] = 'a'
148         hash := fmt.Sprintf("%x", md5.Sum(gooddata))
149         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fmt.Sprintf("%s+%d", hash, len(gooddata)))
150
151         for _, baddata := range [][]byte{
152                 make([]byte, 3),
153                 make([]byte, len(gooddata)),
154                 make([]byte, len(gooddata)-1),
155                 make([]byte, len(gooddata)+1),
156                 make([]byte, len(gooddata)*2),
157         } {
158                 c.Logf("=== baddata len %d", len(baddata))
159                 err := router.keepstore.mountsW[0].BlockWrite(context.Background(), hash, baddata)
160                 c.Assert(err, IsNil)
161
162                 resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
163                 if !c.Check(resp.Code, Equals, http.StatusOK) {
164                         c.Logf("resp.Body: %s", resp.Body.String())
165                 }
166                 c.Check(resp.Body.Len(), Not(Equals), len(gooddata))
167                 c.Check(resp.Result().ContentLength, Equals, int64(len(gooddata)))
168
169                 resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
170                 c.Check(resp.Code, Equals, http.StatusBadGateway)
171
172                 hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, hash)
173                 resp = call(router, "HEAD", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
174                 c.Check(resp.Code, Equals, http.StatusBadGateway)
175         }
176 }
177
178 func (s *routerSuite) TestBlockWrite(c *C) {
179         router, cancel := testRouter(c, s.cluster, nil)
180         defer cancel()
181
182         resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), nil)
183         c.Check(resp.Code, Equals, http.StatusOK)
184         locator := strings.TrimSpace(resp.Body.String())
185
186         resp = call(router, "GET", "http://example/"+locator, arvadostest.ActiveTokenV2, nil, nil)
187         c.Check(resp.Code, Equals, http.StatusOK)
188         c.Check(resp.Body.String(), Equals, "foo")
189 }
190
191 func (s *routerSuite) TestBlockWrite_Headers(c *C) {
192         router, cancel := testRouter(c, s.cluster, nil)
193         defer cancel()
194
195         resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Arvados-Replicas-Desired": []string{"2"}})
196         c.Check(resp.Code, Equals, http.StatusOK)
197         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
198         c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), Equals, "testclass1=1")
199
200         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1"}})
201         c.Check(resp.Code, Equals, http.StatusOK)
202         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
203         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass1=1")
204
205         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{" , testclass2 , "}})
206         c.Check(resp.Code, Equals, http.StatusOK)
207         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
208         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass2=1")
209 }
210
211 func sortCommaSeparated(s string) string {
212         slice := strings.Split(s, ", ")
213         sort.Strings(slice)
214         return strings.Join(slice, ", ")
215 }
216
217 func (s *routerSuite) TestBlockTouch(c *C) {
218         router, cancel := testRouter(c, s.cluster, nil)
219         defer cancel()
220
221         resp := call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
222         c.Check(resp.Code, Equals, http.StatusNotFound)
223
224         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
225         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
226         c.Assert(err, IsNil)
227         vol1 := router.keepstore.mountsW[1].volume.(*stubVolume)
228         err = vol1.BlockWrite(context.Background(), fooHash, []byte("foo"))
229         c.Assert(err, IsNil)
230
231         t1 := time.Now()
232         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
233         c.Check(resp.Code, Equals, http.StatusOK)
234         t2 := time.Now()
235
236         // Unauthorized request is a no-op
237         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", arvadostest.ActiveTokenV2, nil, nil)
238         c.Check(resp.Code, Equals, http.StatusForbidden)
239
240         // Volume 0 mtime should be updated
241         t, err := vol0.Mtime(fooHash)
242         c.Check(err, IsNil)
243         c.Check(t.After(t1), Equals, true)
244         c.Check(t.Before(t2), Equals, true)
245
246         // Volume 1 mtime should not be updated
247         t, err = vol1.Mtime(fooHash)
248         c.Check(err, IsNil)
249         c.Check(t.Before(t1), Equals, true)
250
251         err = vol0.BlockTrash(fooHash)
252         c.Assert(err, IsNil)
253         err = vol1.BlockTrash(fooHash)
254         c.Assert(err, IsNil)
255         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
256         c.Check(resp.Code, Equals, http.StatusNotFound)
257 }
258
259 func (s *routerSuite) TestBlockTrash(c *C) {
260         router, cancel := testRouter(c, s.cluster, nil)
261         defer cancel()
262
263         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
264         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
265         c.Assert(err, IsNil)
266         err = vol0.blockTouchWithTime(fooHash, time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration()))
267         c.Assert(err, IsNil)
268         resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
269         c.Check(resp.Code, Equals, http.StatusOK)
270         c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
271         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
272         c.Assert(err, Equals, os.ErrNotExist)
273 }
274
275 func (s *routerSuite) TestBlockUntrash(c *C) {
276         router, cancel := testRouter(c, s.cluster, nil)
277         defer cancel()
278
279         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
280         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
281         c.Assert(err, IsNil)
282         err = vol0.BlockTrash(fooHash)
283         c.Assert(err, IsNil)
284         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
285         c.Assert(err, Equals, os.ErrNotExist)
286         resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
287         c.Check(resp.Code, Equals, http.StatusOK)
288         c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
289         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
290         c.Check(err, IsNil)
291 }
292
293 func (s *routerSuite) TestBadRequest(c *C) {
294         router, cancel := testRouter(c, s.cluster, nil)
295         defer cancel()
296
297         for _, trial := range []string{
298                 "GET /",
299                 "GET /xyz",
300                 "GET /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabcdefg",
301                 "GET /untrash",
302                 "GET /mounts/blocks/123",
303                 "GET /trash",
304                 "GET /pull",
305                 "GET /debug.json",  // old endpoint, no longer exists
306                 "GET /status.json", // old endpoint, no longer exists
307                 "POST /",
308                 "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
309                 "POST /trash",
310                 "PROPFIND /",
311                 "MAKE-COFFEE /",
312         } {
313                 c.Logf("=== %s", trial)
314                 methodpath := strings.Split(trial, " ")
315                 req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
316                 resp := httptest.NewRecorder()
317                 router.ServeHTTP(resp, req)
318                 c.Check(resp.Code, Equals, http.StatusBadRequest)
319         }
320 }
321
322 func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
323         router, cancel := testRouter(c, s.cluster, nil)
324         defer cancel()
325
326         for _, token := range []string{"badtoken", ""} {
327                 for _, trial := range []string{
328                         "PUT /pull",
329                         "PUT /trash",
330                         "GET /index",
331                         "GET /index/",
332                         "GET /index/1234",
333                         "PUT /untrash/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
334                 } {
335                         c.Logf("=== %s", trial)
336                         methodpath := strings.Split(trial, " ")
337                         req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
338                         if token != "" {
339                                 req.Header.Set("Authorization", "Bearer "+token)
340                         }
341                         resp := httptest.NewRecorder()
342                         router.ServeHTTP(resp, req)
343                         if token == "" {
344                                 c.Check(resp.Code, Equals, http.StatusUnauthorized)
345                         } else {
346                                 c.Check(resp.Code, Equals, http.StatusForbidden)
347                         }
348                 }
349         }
350         req := httptest.NewRequest("TOUCH", "http://example/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil)
351         resp := httptest.NewRecorder()
352         router.ServeHTTP(resp, req)
353         c.Check(resp.Code, Equals, http.StatusUnauthorized)
354 }
355
356 func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
357         router, cancel := testRouter(c, s.cluster, nil)
358         defer cancel()
359         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
360                 return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
361         }
362
363         // To test whether we fall back to volume 1 after volume 0
364         // returns an error, we need to use a block whose rendezvous
365         // order has volume 0 first. Luckily "bar" is such a block.
366         c.Assert(router.keepstore.rendezvous(barHash, router.keepstore.mountsR)[0].UUID, DeepEquals, router.keepstore.mountsR[0].UUID)
367
368         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, barHash+"+3")
369
370         // Volume 0 fails with an error that specifies an HTTP status
371         // code, so that code should be propagated to caller.
372         resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
373         c.Check(resp.Code, Equals, http.StatusBadGateway)
374         c.Check(resp.Body.String(), Equals, "test error\n")
375
376         c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)
377
378         // If the requested block is available on the second volume,
379         // it doesn't matter that the first volume failed.
380         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
381         c.Check(resp.Code, Equals, http.StatusOK)
382         c.Check(resp.Body.String(), Equals, "bar")
383 }
384
385 func (s *routerSuite) TestIndex(c *C) {
386         router, cancel := testRouter(c, s.cluster, nil)
387         defer cancel()
388
389         resp := call(router, "GET", "http://example/index", s.cluster.SystemRootToken, nil, nil)
390         c.Check(resp.Code, Equals, http.StatusOK)
391         c.Check(resp.Body.String(), Equals, "\n")
392
393         resp = call(router, "GET", "http://example/index?prefix=fff", s.cluster.SystemRootToken, nil, nil)
394         c.Check(resp.Code, Equals, http.StatusOK)
395         c.Check(resp.Body.String(), Equals, "\n")
396
397         t0 := time.Now().Add(-time.Hour)
398         vol0 := router.keepstore.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume)
399         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
400         c.Assert(err, IsNil)
401         err = vol0.blockTouchWithTime(fooHash, t0)
402         c.Assert(err, IsNil)
403         err = vol0.BlockWrite(context.Background(), barHash, []byte("bar"))
404         c.Assert(err, IsNil)
405         err = vol0.blockTouchWithTime(barHash, t0)
406         c.Assert(err, IsNil)
407         t1 := time.Now().Add(-time.Minute)
408         vol1 := router.keepstore.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume)
409         err = vol1.BlockWrite(context.Background(), barHash, []byte("bar"))
410         c.Assert(err, IsNil)
411         err = vol1.blockTouchWithTime(barHash, t1)
412         c.Assert(err, IsNil)
413
414         for _, path := range []string{
415                 "/index?prefix=acb",
416                 "/index/acb",
417                 "/index/?prefix=acb",
418                 "/mounts/zzzzz-nyw5e-000000000000000/blocks?prefix=acb",
419                 "/mounts/zzzzz-nyw5e-000000000000000/blocks/?prefix=acb",
420                 "/mounts/zzzzz-nyw5e-000000000000000/blocks/acb",
421         } {
422                 c.Logf("=== %s", path)
423                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
424                 c.Check(resp.Code, Equals, http.StatusOK)
425                 c.Check(resp.Body.String(), Equals, fooHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n\n")
426         }
427
428         for _, path := range []string{
429                 "/index?prefix=37",
430                 "/index/37",
431                 "/index/?prefix=37",
432         } {
433                 c.Logf("=== %s", path)
434                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
435                 c.Check(resp.Code, Equals, http.StatusOK)
436                 c.Check(resp.Body.String(), Equals, ""+
437                         barHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n"+
438                         barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
439         }
440
441         for _, path := range []string{
442                 "/mounts/zzzzz-nyw5e-111111111111111/blocks",
443                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/",
444                 "/mounts/zzzzz-nyw5e-111111111111111/blocks?prefix=37",
445                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/?prefix=37",
446                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/37",
447         } {
448                 c.Logf("=== %s", path)
449                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
450                 c.Check(resp.Code, Equals, http.StatusOK)
451                 c.Check(resp.Body.String(), Equals, barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
452         }
453
454         for _, path := range []string{
455                 "/index",
456                 "/index?prefix=",
457                 "/index/",
458                 "/index/?prefix=",
459         } {
460                 c.Logf("=== %s", path)
461                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
462                 c.Check(resp.Code, Equals, http.StatusOK)
463                 c.Check(strings.Split(resp.Body.String(), "\n"), HasLen, 5)
464         }
465
466 }
467
468 // Check that the context passed to a volume method gets cancelled
469 // when the http client hangs up.
470 func (s *routerSuite) TestCancelOnDisconnect(c *C) {
471         router, cancel := testRouter(c, s.cluster, nil)
472         defer cancel()
473
474         unblock := make(chan struct{})
475         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
476                 <-unblock
477                 c.Check(ctx.Err(), NotNil)
478                 return ctx.Err()
479         }
480         go func() {
481                 time.Sleep(time.Second / 10)
482                 cancel()
483                 close(unblock)
484         }()
485         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
486         ctx, cancel := context.WithCancel(context.Background())
487         defer cancel()
488         req, err := http.NewRequestWithContext(ctx, "GET", "http://example/"+locSigned, nil)
489         c.Assert(err, IsNil)
490         req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
491         resp := httptest.NewRecorder()
492         router.ServeHTTP(resp, req)
493         c.Check(resp.Code, Equals, 499)
494 }
495
496 func call(handler http.Handler, method, path, tok string, body []byte, hdr http.Header) *httptest.ResponseRecorder {
497         resp := httptest.NewRecorder()
498         req, err := http.NewRequest(method, path, bytes.NewReader(body))
499         if err != nil {
500                 panic(err)
501         }
502         for k := range hdr {
503                 req.Header.Set(k, hdr.Get(k))
504         }
505         if tok != "" {
506                 req.Header.Set("Authorization", "Bearer "+tok)
507         }
508         handler.ServeHTTP(resp, req)
509         return resp
510 }