// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package keepstore

import (
	"bytes"
	"context"
	"crypto/md5"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"os"
	"sort"
	"strings"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/arvadostest"
	"git.arvados.org/arvados.git/sdk/go/httpserver"
	"github.com/prometheus/client_golang/prometheus"
	. "gopkg.in/check.v1"
)

// routerSuite tests that the router correctly translates HTTP
// requests to the appropriate keepstore functionality, and translates
// the results to HTTP responses.
type routerSuite struct {
	cluster *arvados.Cluster
}

var _ = Suite(&routerSuite{})

func testRouter(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*router, context.CancelFunc) {
	if reg == nil {
		reg = prometheus.NewRegistry()
	}
	ctx, cancel := context.WithCancel(context.Background())
	ks, kcancel := testKeepstore(t, cluster, reg)
	go func() {
		<-ctx.Done()
		kcancel()
	}()
	puller := newPuller(ctx, ks, reg)
	trasher := newTrasher(ctx, ks, reg)
	return newRouter(ks, puller, trasher).(*router), cancel
}

func (s *routerSuite) SetUpTest(c *C) {
	s.cluster = testCluster(c)
	s.cluster.Volumes = map[string]arvados.Volume{
		"zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass1": true}},
		"zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass2": true}},
	}
	s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
		"testclass1": arvados.StorageClassConfig{
			Default: true,
		},
		"testclass2": arvados.StorageClassConfig{
			Default: true,
		},
	}
}

func (s *routerSuite) TestBlockRead_Token(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)
	locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
	c.Assert(locSigned, Not(Equals), fooHash+"+3")

	// No token provided
	resp := call(router, "GET", "http://example/"+locSigned, "", nil, nil)
	c.Check(resp.Code, Equals, http.StatusUnauthorized)
	c.Check(resp.Body.String(), Matches, "no token provided in Authorization header\n")
	checkCORSHeaders(c, resp.Header())

	// Different token => invalid signature
	resp = call(router, "GET", "http://example/"+locSigned, "badtoken", nil, nil)
	c.Check(resp.Code, Equals, http.StatusBadRequest)
	c.Check(resp.Body.String(), Equals, "invalid signature\n")
	checkCORSHeaders(c, resp.Header())

	// Correct token
	resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Body.String(), Equals, "foo")
	checkCORSHeaders(c, resp.Header())

	// HEAD
	resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Result().ContentLength, Equals, int64(3))
	c.Check(resp.Body.String(), Equals, "")
	checkCORSHeaders(c, resp.Header())
}

// As a special case we allow HEAD requests that only provide a hash
// without a size hint. This accommodates uses of keep-block-check
// where it's inconvenient to attach size hints to known hashes.
//
// GET requests must provide a size hint -- otherwise we can't
// propagate a checksum mismatch error.
func (s *routerSuite) TestBlockRead_NoSizeHint(c *C) {
	s.cluster.Collections.BlobSigning = true
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()
	err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)

	// hash+signature
	hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash)
	resp := call(router, "GET", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)

	resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
	c.Check(resp.Code, Equals, http.StatusUnauthorized)
	resp = call(router, "HEAD", "http://example/"+fooHash+"+3", "", nil, nil)
	c.Check(resp.Code, Equals, http.StatusUnauthorized)

	s.cluster.Collections.BlobSigning = false
	router, cancel = testRouter(c, s.cluster, nil)
	defer cancel()
	err = router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)

	resp = call(router, "GET", "http://example/"+fooHash, "", nil, nil)
	c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)

	resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Body.String(), Equals, "")
	c.Check(resp.Result().ContentLength, Equals, int64(3))
	c.Check(resp.Header().Get("Content-Length"), Equals, "3")
}

// By the time we discover the checksum mismatch, it's too late to
// change the response code, but the expected block size is given in
// the Content-Length response header, so a generic http client can
// detect the problem.
func (s *routerSuite) TestBlockRead_ChecksumMismatch(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	gooddata := make([]byte, 10_000_000)
	gooddata[0] = 'a'
	hash := fmt.Sprintf("%x", md5.Sum(gooddata))
	locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fmt.Sprintf("%s+%d", hash, len(gooddata)))

	for _, baddata := range [][]byte{
		make([]byte, 3),
		make([]byte, len(gooddata)),
		make([]byte, len(gooddata)-1),
		make([]byte, len(gooddata)+1),
		make([]byte, len(gooddata)*2),
	} {
		c.Logf("=== baddata len %d", len(baddata))
		err := router.keepstore.mountsW[0].BlockWrite(context.Background(), hash, baddata)
		c.Assert(err, IsNil)

		resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
		if !c.Check(resp.Code, Equals, http.StatusOK) {
			c.Logf("resp.Body: %s", resp.Body.String())
		}
		c.Check(resp.Body.Len(), Not(Equals), len(gooddata))
		c.Check(resp.Result().ContentLength, Equals, int64(len(gooddata)))
		checkCORSHeaders(c, resp.Header())

		resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
		c.Check(resp.Code, Equals, http.StatusBadGateway)
		checkCORSHeaders(c, resp.Header())

		hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, hash)
		resp = call(router, "HEAD", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
		c.Check(resp.Code, Equals, http.StatusBadGateway)
		checkCORSHeaders(c, resp.Header())
	}
}

func (s *routerSuite) TestBlockWrite(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	checkCORSHeaders(c, resp.Header())
	locator := strings.TrimSpace(resp.Body.String())

	resp = call(router, "GET", "http://example/"+locator, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Body.String(), Equals, "foo")
}

func (s *routerSuite) TestBlockWrite_Headers(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Desired-Replicas": []string{"2"}})
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
	c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), Equals, "testclass1=1")

	resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1"}})
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
	c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass1=1")

	resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{" , testclass2 , "}})
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
	c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass2=1")

	resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1, testclass2"}})
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "2")
	confirmed := strings.Split(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), ", ")
	sort.Strings(confirmed)
	c.Check(confirmed, DeepEquals, []string{"testclass1=1", "testclass2=1"})
}

func sortCommaSeparated(s string) string {
	slice := strings.Split(s, ", ")
	sort.Strings(slice)
	return strings.Join(slice, ", ")
}

func (s *routerSuite) TestBlockTouch(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	resp := call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusNotFound)

	vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
	err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)
	vol1 := router.keepstore.mountsW[1].volume.(*stubVolume)
	err = vol1.BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)

	t1 := time.Now()
	resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	t2 := time.Now()

	// Unauthorized request is a no-op
	resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusForbidden)

	// Volume 0 mtime should be updated
	t, err := vol0.Mtime(fooHash)
	c.Check(err, IsNil)
	c.Check(t.After(t1), Equals, true)
	c.Check(t.Before(t2), Equals, true)

	// Volume 1 mtime should not be updated
	t, err = vol1.Mtime(fooHash)
	c.Check(err, IsNil)
	c.Check(t.Before(t1), Equals, true)

	err = vol0.BlockTrash(fooHash)
	c.Assert(err, IsNil)
	err = vol1.BlockTrash(fooHash)
	c.Assert(err, IsNil)
	resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusNotFound)
}

func (s *routerSuite) TestBlockTrash(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
	err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)
	err = vol0.blockTouchWithTime(fooHash, time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration()))
	c.Assert(err, IsNil)
	resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
	err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
	c.Assert(err, Equals, os.ErrNotExist)
}

func (s *routerSuite) TestBlockUntrash(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
	err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)
	err = vol0.BlockTrash(fooHash)
	c.Assert(err, IsNil)
	err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
	c.Assert(err, Equals, os.ErrNotExist)
	resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
	err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
	c.Check(err, IsNil)
}

func (s *routerSuite) TestBadRequest(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	for _, trial := range []string{
		"GET /",
		"GET /xyz",
		"GET /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabcdefg",
		"GET /untrash",
		"GET /mounts/blocks/123",
		"GET /trash",
		"GET /pull",
		"GET /debug.json",  // old endpoint, no longer exists
		"GET /status.json", // old endpoint, no longer exists
		"POST /",
		"POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
		"POST /trash",
		"PROPFIND /",
		"MAKE-COFFEE /",
	} {
		c.Logf("=== %s", trial)
		methodpath := strings.Split(trial, " ")
		req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
		resp := httptest.NewRecorder()
		router.ServeHTTP(resp, req)
		c.Check(resp.Code, Equals, http.StatusBadRequest)
	}
}

func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	for _, token := range []string{"badtoken", ""} {
		for _, trial := range []string{
			"PUT /pull",
			"PUT /trash",
			"GET /index",
			"GET /index/",
			"GET /index/1234",
			"PUT /untrash/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
		} {
			c.Logf("=== %s", trial)
			methodpath := strings.Split(trial, " ")
			req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
			if token != "" {
				req.Header.Set("Authorization", "Bearer "+token)
			}
			resp := httptest.NewRecorder()
			router.ServeHTTP(resp, req)
			if token == "" {
				c.Check(resp.Code, Equals, http.StatusUnauthorized)
			} else {
				c.Check(resp.Code, Equals, http.StatusForbidden)
			}
		}
	}
	req := httptest.NewRequest("TOUCH", "http://example/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil)
	resp := httptest.NewRecorder()
	router.ServeHTTP(resp, req)
	c.Check(resp.Code, Equals, http.StatusUnauthorized)
}

func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()
	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
		return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
	}

	// To test whether we fall back to volume 1 after volume 0
	// returns an error, we need to use a block whose rendezvous
	// order has volume 0 first. Luckily "bar" is such a block.
	c.Assert(router.keepstore.rendezvous(barHash, router.keepstore.mountsR)[0].UUID, DeepEquals, router.keepstore.mountsR[0].UUID)

	locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, barHash+"+3")

	// Volume 0 fails with an error that specifies an HTTP status
	// code, so that code should be propagated to caller.
	resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusBadGateway)
	c.Check(resp.Body.String(), Equals, "test error\n")

	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
		return errors.New("no http status provided")
	}
	resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusInternalServerError)
	c.Check(resp.Body.String(), Equals, "no http status provided\n")

	c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)

	// If the requested block is available on the second volume,
	// it doesn't matter that the first volume failed.
	resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Body.String(), Equals, "bar")
}

func (s *routerSuite) TestIndex(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	resp := call(router, "GET", "http://example/index", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Body.String(), Equals, "\n")

	resp = call(router, "GET", "http://example/index?prefix=fff", s.cluster.SystemRootToken, nil, nil)
	c.Check(resp.Code, Equals, http.StatusOK)
	c.Check(resp.Body.String(), Equals, "\n")

	t0 := time.Now().Add(-time.Hour)
	vol0 := router.keepstore.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume)
	err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
	c.Assert(err, IsNil)
	err = vol0.blockTouchWithTime(fooHash, t0)
	c.Assert(err, IsNil)
	err = vol0.BlockWrite(context.Background(), barHash, []byte("bar"))
	c.Assert(err, IsNil)
	err = vol0.blockTouchWithTime(barHash, t0)
	c.Assert(err, IsNil)
	t1 := time.Now().Add(-time.Minute)
	vol1 := router.keepstore.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume)
	err = vol1.BlockWrite(context.Background(), barHash, []byte("bar"))
	c.Assert(err, IsNil)
	err = vol1.blockTouchWithTime(barHash, t1)
	c.Assert(err, IsNil)

	for _, path := range []string{
		"/index?prefix=acb",
		"/index/acb",
		"/index/?prefix=acb",
		"/mounts/zzzzz-nyw5e-000000000000000/blocks?prefix=acb",
		"/mounts/zzzzz-nyw5e-000000000000000/blocks/?prefix=acb",
		"/mounts/zzzzz-nyw5e-000000000000000/blocks/acb",
	} {
		c.Logf("=== %s", path)
		resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
		c.Check(resp.Code, Equals, http.StatusOK)
		c.Check(resp.Body.String(), Equals, fooHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n\n")
	}

	for _, path := range []string{
		"/index?prefix=37",
		"/index/37",
		"/index/?prefix=37",
	} {
		c.Logf("=== %s", path)
		resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
		c.Check(resp.Code, Equals, http.StatusOK)
		c.Check(resp.Body.String(), Equals, ""+
			barHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n"+
			barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
	}

	for _, path := range []string{
		"/mounts/zzzzz-nyw5e-111111111111111/blocks",
		"/mounts/zzzzz-nyw5e-111111111111111/blocks/",
		"/mounts/zzzzz-nyw5e-111111111111111/blocks?prefix=37",
		"/mounts/zzzzz-nyw5e-111111111111111/blocks/?prefix=37",
		"/mounts/zzzzz-nyw5e-111111111111111/blocks/37",
	} {
		c.Logf("=== %s", path)
		resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
		c.Check(resp.Code, Equals, http.StatusOK)
		c.Check(resp.Body.String(), Equals, barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
	}

	for _, path := range []string{
		"/index",
		"/index?prefix=",
		"/index/",
		"/index/?prefix=",
	} {
		c.Logf("=== %s", path)
		resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
		c.Check(resp.Code, Equals, http.StatusOK)
		c.Check(strings.Split(resp.Body.String(), "\n"), HasLen, 5)
	}
}

// Check that the context passed to a volume method gets cancelled
// when the http client hangs up.
func (s *routerSuite) TestCancelOnDisconnect(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	unblock := make(chan struct{})
	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
		<-unblock
		c.Check(ctx.Err(), NotNil)
		return ctx.Err()
	}
	go func() {
		time.Sleep(time.Second / 10)
		cancel()
		close(unblock)
	}()
	locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	req, err := http.NewRequestWithContext(ctx, "GET", "http://example/"+locSigned, nil)
	c.Assert(err, IsNil)
	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
	resp := httptest.NewRecorder()
	router.ServeHTTP(resp, req)
	c.Check(resp.Code, Equals, 499)
}

func (s *routerSuite) TestCORSPreflight(c *C) {
	router, cancel := testRouter(c, s.cluster, nil)
	defer cancel()

	for _, path := range []string{"/", "/whatever", "/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123"} {
		c.Logf("=== %s", path)
		resp := call(router, http.MethodOptions, "http://example"+path, arvadostest.ActiveTokenV2, nil, nil)
		c.Check(resp.Code, Equals, http.StatusOK)
		c.Check(resp.Body.String(), Equals, "")
		checkCORSHeaders(c, resp.Header())
	}
}

func call(handler http.Handler, method, path, tok string, body []byte, hdr http.Header) *httptest.ResponseRecorder {
	resp := httptest.NewRecorder()
	req, err := http.NewRequest(method, path, bytes.NewReader(body))
	if err != nil {
		panic(err)
	}
	for k := range hdr {
		req.Header.Set(k, hdr.Get(k))
	}
	if tok != "" {
		req.Header.Set("Authorization", "Bearer "+tok)
	}
	handler.ServeHTTP(resp, req)
	return resp
}

func checkCORSHeaders(c *C, h http.Header) {
	c.Check(h.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, PUT, OPTIONS")
	c.Check(h.Get("Access-Control-Allow-Origin"), Equals, "*")
	c.Check(h.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas, X-Keep-Signature, X-Keep-Storage-Classes")
	c.Check(h.Get("Access-Control-Expose-Headers"), Equals, "X-Keep-Locator, X-Keep-Replicas-Stored, X-Keep-Storage-Classes-Confirmed")
}