2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / router_test.go
diff --git a/services/keepstore/router_test.go b/services/keepstore/router_test.go
new file mode 100644 (file)
index 0000000..a729ee0
--- /dev/null
@@ -0,0 +1,510 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
+
+import (
+       "bytes"
+       "context"
+       "crypto/md5"
+       "errors"
+       "fmt"
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "sort"
+       "strings"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/prometheus/client_golang/prometheus"
+       . "gopkg.in/check.v1"
+)
+
+// routerSuite tests that the router correctly translates HTTP
+// requests to the appropriate keepstore functionality, and translates
+// the results to HTTP responses.
+type routerSuite struct {
+       cluster *arvados.Cluster
+}
+
+var _ = Suite(&routerSuite{})
+
+func testRouter(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*router, context.CancelFunc) {
+       if reg == nil {
+               reg = prometheus.NewRegistry()
+       }
+       ctx, cancel := context.WithCancel(context.Background())
+       ks, kcancel := testKeepstore(t, cluster, reg)
+       go func() {
+               <-ctx.Done()
+               kcancel()
+       }()
+       puller := newPuller(ctx, ks, reg)
+       trasher := newTrasher(ctx, ks, reg)
+       return newRouter(ks, puller, trasher).(*router), cancel
+}
+
+func (s *routerSuite) SetUpTest(c *C) {
+       s.cluster = testCluster(c)
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass1": true}},
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass2": true}},
+       }
+       s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+               "testclass1": arvados.StorageClassConfig{
+                       Default: true,
+               },
+               "testclass2": arvados.StorageClassConfig{
+                       Default: true,
+               },
+       }
+}
+
+func (s *routerSuite) TestBlockRead_Token(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+       locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
+       c.Assert(locSigned, Not(Equals), fooHash+"+3")
+
+       // No token provided
+       resp := call(router, "GET", "http://example/"+locSigned, "", nil, nil)
+       c.Check(resp.Code, Equals, http.StatusUnauthorized)
+       c.Check(resp.Body.String(), Matches, "no token provided in Authorization header\n")
+
+       // Different token => invalid signature
+       resp = call(router, "GET", "http://example/"+locSigned, "badtoken", nil, nil)
+       c.Check(resp.Code, Equals, http.StatusBadRequest)
+       c.Check(resp.Body.String(), Equals, "invalid signature\n")
+
+       // Correct token
+       resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Body.String(), Equals, "foo")
+
+       // HEAD
+       resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Result().ContentLength, Equals, int64(3))
+       c.Check(resp.Body.String(), Equals, "")
+}
+
+// As a special case we allow HEAD requests that only provide a hash
+// without a size hint. This accommodates uses of keep-block-check
+// where it's inconvenient to attach size hints to known hashes.
+//
+// GET requests must provide a size hint -- otherwise we can't
+// propagate a checksum mismatch error.
+func (s *routerSuite) TestBlockRead_NoSizeHint(c *C) {
+       s.cluster.Collections.BlobSigning = true
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+       err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+
+       // hash+signature
+       hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash)
+       resp := call(router, "GET", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
+
+       resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
+       c.Check(resp.Code, Equals, http.StatusUnauthorized)
+       resp = call(router, "HEAD", "http://example/"+fooHash+"+3", "", nil, nil)
+       c.Check(resp.Code, Equals, http.StatusUnauthorized)
+
+       s.cluster.Collections.BlobSigning = false
+       router, cancel = testRouter(c, s.cluster, nil)
+       defer cancel()
+       err = router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+
+       resp = call(router, "GET", "http://example/"+fooHash, "", nil, nil)
+       c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
+
+       resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Body.String(), Equals, "")
+       c.Check(resp.Result().ContentLength, Equals, int64(3))
+       c.Check(resp.Header().Get("Content-Length"), Equals, "3")
+}
+
+// By the time we discover the checksum mismatch, it's too late to
+// change the response code, but the expected block size is given in
+// the Content-Length response header, so a generic http client can
+// detect the problem.
+func (s *routerSuite) TestBlockRead_ChecksumMismatch(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       gooddata := make([]byte, 10_000_000)
+       gooddata[0] = 'a'
+       hash := fmt.Sprintf("%x", md5.Sum(gooddata))
+       locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fmt.Sprintf("%s+%d", hash, len(gooddata)))
+
+       for _, baddata := range [][]byte{
+               make([]byte, 3),
+               make([]byte, len(gooddata)),
+               make([]byte, len(gooddata)-1),
+               make([]byte, len(gooddata)+1),
+               make([]byte, len(gooddata)*2),
+       } {
+               c.Logf("=== baddata len %d", len(baddata))
+               err := router.keepstore.mountsW[0].BlockWrite(context.Background(), hash, baddata)
+               c.Assert(err, IsNil)
+
+               resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+               if !c.Check(resp.Code, Equals, http.StatusOK) {
+                       c.Logf("resp.Body: %s", resp.Body.String())
+               }
+               c.Check(resp.Body.Len(), Not(Equals), len(gooddata))
+               c.Check(resp.Result().ContentLength, Equals, int64(len(gooddata)))
+
+               resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+               c.Check(resp.Code, Equals, http.StatusBadGateway)
+
+               hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, hash)
+               resp = call(router, "HEAD", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
+               c.Check(resp.Code, Equals, http.StatusBadGateway)
+       }
+}
+
+func (s *routerSuite) TestBlockWrite(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       locator := strings.TrimSpace(resp.Body.String())
+
+       resp = call(router, "GET", "http://example/"+locator, arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Body.String(), Equals, "foo")
+}
+
+func (s *routerSuite) TestBlockWrite_Headers(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Arvados-Replicas-Desired": []string{"2"}})
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
+       c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), Equals, "testclass1=1")
+
+       resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1"}})
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
+       c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass1=1")
+
+       resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{" , testclass2 , "}})
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
+       c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass2=1")
+}
+
+func sortCommaSeparated(s string) string {
+       slice := strings.Split(s, ", ")
+       sort.Strings(slice)
+       return strings.Join(slice, ", ")
+}
+
+func (s *routerSuite) TestBlockTouch(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       resp := call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusNotFound)
+
+       vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
+       err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+       vol1 := router.keepstore.mountsW[1].volume.(*stubVolume)
+       err = vol1.BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+
+       t1 := time.Now()
+       resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       t2 := time.Now()
+
+       // Unauthorized request is a no-op
+       resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusForbidden)
+
+       // Volume 0 mtime should be updated
+       t, err := vol0.Mtime(fooHash)
+       c.Check(err, IsNil)
+       c.Check(t.After(t1), Equals, true)
+       c.Check(t.Before(t2), Equals, true)
+
+       // Volume 1 mtime should not be updated
+       t, err = vol1.Mtime(fooHash)
+       c.Check(err, IsNil)
+       c.Check(t.Before(t1), Equals, true)
+
+       err = vol0.BlockTrash(fooHash)
+       c.Assert(err, IsNil)
+       err = vol1.BlockTrash(fooHash)
+       c.Assert(err, IsNil)
+       resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusNotFound)
+}
+
+func (s *routerSuite) TestBlockTrash(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
+       err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+       err = vol0.blockTouchWithTime(fooHash, time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration()))
+       c.Assert(err, IsNil)
+       resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
+       _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+       c.Assert(err, Equals, os.ErrNotExist)
+}
+
+func (s *routerSuite) TestBlockUntrash(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
+       err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+       err = vol0.BlockTrash(fooHash)
+       c.Assert(err, IsNil)
+       _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+       c.Assert(err, Equals, os.ErrNotExist)
+       resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
+       _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+       c.Check(err, IsNil)
+}
+
+func (s *routerSuite) TestBadRequest(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       for _, trial := range []string{
+               "GET /",
+               "GET /xyz",
+               "GET /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabcdefg",
+               "GET /untrash",
+               "GET /mounts/blocks/123",
+               "GET /trash",
+               "GET /pull",
+               "GET /debug.json",
+               "GET /status.json",
+               "POST /",
+               "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+               "POST /trash",
+               "PROPFIND /",
+               "MAKE-COFFEE /",
+       } {
+               c.Logf("=== %s", trial)
+               methodpath := strings.Split(trial, " ")
+               req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
+               resp := httptest.NewRecorder()
+               router.ServeHTTP(resp, req)
+               c.Check(resp.Code, Equals, http.StatusBadRequest)
+       }
+}
+
+func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       for _, token := range []string{"badtoken", ""} {
+               for _, trial := range []string{
+                       "PUT /pull",
+                       "PUT /trash",
+                       "GET /index",
+                       "GET /index/",
+                       "GET /index/1234",
+                       "PUT /untrash/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+               } {
+                       c.Logf("=== %s", trial)
+                       methodpath := strings.Split(trial, " ")
+                       req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
+                       if token != "" {
+                               req.Header.Set("Authorization", "Bearer "+token)
+                       }
+                       resp := httptest.NewRecorder()
+                       router.ServeHTTP(resp, req)
+                       if token == "" {
+                               c.Check(resp.Code, Equals, http.StatusUnauthorized)
+                       } else {
+                               c.Check(resp.Code, Equals, http.StatusForbidden)
+                       }
+               }
+       }
+       req := httptest.NewRequest("TOUCH", "http://example/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil)
+       resp := httptest.NewRecorder()
+       router.ServeHTTP(resp, req)
+       c.Check(resp.Code, Equals, http.StatusUnauthorized)
+}
+
+func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+       router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.Writer) (int, error) {
+               return 0, httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
+       }
+
+       // To test whether we fall back to volume 1 after volume 0
+       // returns an error, we need to use a block whose rendezvous
+       // order has volume 0 first. Luckily "bar" is such a block.
+       c.Assert(router.keepstore.rendezvous(barHash, router.keepstore.mountsR)[0].UUID, DeepEquals, router.keepstore.mountsR[0].UUID)
+
+       locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, barHash+"+3")
+
+       // Volume 0 fails with an error that specifies an HTTP status
+       // code, so that code should be propagated to caller.
+       resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusBadGateway)
+       c.Check(resp.Body.String(), Equals, "test error\n")
+
+       c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)
+
+       // If the requested block is available on the second volume,
+       // it doesn't matter that the first volume failed.
+       resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Body.String(), Equals, "bar")
+}
+
+func (s *routerSuite) TestIndex(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       resp := call(router, "GET", "http://example/index", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Body.String(), Equals, "\n")
+
+       resp = call(router, "GET", "http://example/index?prefix=fff", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(resp.Body.String(), Equals, "\n")
+
+       t0 := time.Now().Add(-time.Hour)
+       vol0 := router.keepstore.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume)
+       err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+       err = vol0.blockTouchWithTime(fooHash, t0)
+       c.Assert(err, IsNil)
+       err = vol0.BlockWrite(context.Background(), barHash, []byte("bar"))
+       c.Assert(err, IsNil)
+       err = vol0.blockTouchWithTime(barHash, t0)
+       c.Assert(err, IsNil)
+       t1 := time.Now().Add(-time.Minute)
+       vol1 := router.keepstore.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume)
+       err = vol1.BlockWrite(context.Background(), barHash, []byte("bar"))
+       c.Assert(err, IsNil)
+       err = vol1.blockTouchWithTime(barHash, t1)
+       c.Assert(err, IsNil)
+
+       for _, path := range []string{
+               "/index?prefix=acb",
+               "/index/acb",
+               "/index/?prefix=acb",
+               "/mounts/zzzzz-nyw5e-000000000000000/blocks?prefix=acb",
+               "/mounts/zzzzz-nyw5e-000000000000000/blocks/?prefix=acb",
+               "/mounts/zzzzz-nyw5e-000000000000000/blocks/acb",
+       } {
+               c.Logf("=== %s", path)
+               resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
+               c.Check(resp.Code, Equals, http.StatusOK)
+               c.Check(resp.Body.String(), Equals, fooHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n\n")
+       }
+
+       for _, path := range []string{
+               "/index?prefix=37",
+               "/index/37",
+               "/index/?prefix=37",
+       } {
+               c.Logf("=== %s", path)
+               resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
+               c.Check(resp.Code, Equals, http.StatusOK)
+               c.Check(resp.Body.String(), Equals, ""+
+                       barHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n"+
+                       barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
+       }
+
+       for _, path := range []string{
+               "/mounts/zzzzz-nyw5e-111111111111111/blocks",
+               "/mounts/zzzzz-nyw5e-111111111111111/blocks/",
+               "/mounts/zzzzz-nyw5e-111111111111111/blocks?prefix=37",
+               "/mounts/zzzzz-nyw5e-111111111111111/blocks/?prefix=37",
+               "/mounts/zzzzz-nyw5e-111111111111111/blocks/37",
+       } {
+               c.Logf("=== %s", path)
+               resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
+               c.Check(resp.Code, Equals, http.StatusOK)
+               c.Check(resp.Body.String(), Equals, barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
+       }
+
+       for _, path := range []string{
+               "/index",
+               "/index?prefix=",
+               "/index/",
+               "/index/?prefix=",
+       } {
+               c.Logf("=== %s", path)
+               resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
+               c.Check(resp.Code, Equals, http.StatusOK)
+               c.Check(strings.Split(resp.Body.String(), "\n"), HasLen, 5)
+       }
+
+}
+
+// Check that the context passed to a volume method gets cancelled
+// when the http client hangs up.
+func (s *routerSuite) TestCancelOnDisconnect(c *C) {
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       unblock := make(chan struct{})
+       router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.Writer) (int, error) {
+               <-unblock
+               c.Check(ctx.Err(), NotNil)
+               return 0, ctx.Err()
+       }
+       go func() {
+               time.Sleep(time.Second / 10)
+               cancel()
+               close(unblock)
+       }()
+       locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       req, err := http.NewRequestWithContext(ctx, "GET", "http://example/"+locSigned, nil)
+       c.Assert(err, IsNil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
+       resp := httptest.NewRecorder()
+       router.ServeHTTP(resp, req)
+       c.Check(resp.Code, Equals, 499)
+}
+
+func call(handler http.Handler, method, path, tok string, body []byte, hdr http.Header) *httptest.ResponseRecorder {
+       resp := httptest.NewRecorder()
+       req, err := http.NewRequest(method, path, bytes.NewReader(body))
+       if err != nil {
+               panic(err)
+       }
+       for k := range hdr {
+               req.Header.Set(k, hdr.Get(k))
+       }
+       if tok != "" {
+               req.Header.Set("Authorization", "Bearer "+tok)
+       }
+       handler.ServeHTTP(resp, req)
+       return resp
+}