// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0

package keepclient

import (
	"bytes"
	"context"
	"crypto/md5"
	"fmt"
	"io"
	"io/ioutil"
	"net"
	"net/http"
	"os"
	"strings"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
	"git.arvados.org/arvados.git/sdk/go/arvadostest"
	. "gopkg.in/check.v1"
)

func Test(t *testing.T) {
	DefaultRetryDelay = 50 * time.Millisecond
	TestingT(t)
}

// Gocheck boilerplate
var _ = Suite(&ServerRequiredSuite{})
var _ = Suite(&StandaloneSuite{})

// Tests that require the Keep server running
type ServerRequiredSuite struct{}

// Standalone tests
type StandaloneSuite struct {
	origDefaultRetryDelay time.Duration
	origMinimumRetryDelay time.Duration
}

var origHOME = os.Getenv("HOME")

func (s *StandaloneSuite) SetUpTest(c *C) {
	RefreshServiceDiscovery()
	// Prevent cache state from leaking between test cases
	os.Setenv("HOME", c.MkDir())
	s.origDefaultRetryDelay = DefaultRetryDelay
	s.origMinimumRetryDelay = MinimumRetryDelay
}

func (s *StandaloneSuite) TearDownTest(c *C) {
	os.Setenv("HOME", origHOME)
	DefaultRetryDelay = s.origDefaultRetryDelay
	MinimumRetryDelay = s.origMinimumRetryDelay
}

func pythonDir() string {
	cwd, _ := os.Getwd()
	return fmt.Sprintf("%s/../../python/tests", cwd)
}

func (s *ServerRequiredSuite) SetUpSuite(c *C) {
	arvadostest.StartKeep(2, false)
}

func (s *ServerRequiredSuite) TearDownSuite(c *C) {
	arvadostest.StopKeep(2)
	os.Setenv("HOME", origHOME)
}

func (s *ServerRequiredSuite) SetUpTest(c *C) {
	RefreshServiceDiscovery()
	// Prevent cache state from leaking between test cases
	os.Setenv("HOME", c.MkDir())
}

func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
	arv, err := arvadosclient.MakeArvadosClient()
	c.Assert(err, IsNil)

	kc, err := MakeKeepClient(arv)

	c.Assert(err, IsNil)
	c.Check(len(kc.LocalRoots()), Equals, 2)
	for _, root := range kc.LocalRoots() {
		c.Check(root, Matches, "http://localhost:\\d+")
	}
}

func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
	arv, err := arvadosclient.MakeArvadosClient()
	c.Assert(err, IsNil)

	cc, err := arv.ClusterConfig("StorageClasses")
	c.Assert(err, IsNil)
	c.Assert(cc, NotNil)
	c.Assert(cc.(map[string]interface{})["default"], NotNil)

	kc := New(arv)
	c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
}

func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
	arv, err := arvadosclient.MakeArvadosClient()
	c.Assert(err, IsNil)

	kc, err := MakeKeepClient(arv)
	c.Check(err, IsNil)
	c.Assert(kc.Want_replicas, Equals, 2)

	arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
	kc, err = MakeKeepClient(arv)
	c.Check(err, IsNil)
	c.Assert(kc.Want_replicas, Equals, 3)

	arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
	kc, err = MakeKeepClient(arv)
	c.Check(err, IsNil)
	c.Assert(kc.Want_replicas, Equals, 1)
}

type StubPutHandler struct {
	c                    *C
	expectPath           string
	expectAPIToken       string
	expectBody           string
	expectStorageClass   string
	returnStorageClasses string
	handled              chan string
	requests             []*http.Request
	mtx                  sync.Mutex
}

func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	sph.mtx.Lock()
	sph.requests = append(sph.requests, req)
	sph.mtx.Unlock()
	sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
	sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("Bearer %s", sph.expectAPIToken))
	if sph.expectStorageClass != "*" {
		sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
	}
	body, err := ioutil.ReadAll(req.Body)
	sph.c.Check(err, IsNil)
	sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
	resp.Header().Set("X-Keep-Replicas-Stored", "1")
	if sph.returnStorageClasses != "" {
		resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
	}
	resp.WriteHeader(200)
	sph.handled <- fmt.Sprintf("http://%s", req.Host)
}

func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
	var err error
	// If we don't explicitly bind it to localhost, ks.listener.Addr() will
	// bind to 0.0.0.0 or [::] which is not a valid address for Dial()
	ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
	if err != nil {
		panic("Could not listen on any port")
	}
	ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
	go http.Serve(ks.listener, st)
	return
}

func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
	io.ReadCloser, io.WriteCloser, chan uploadStatus)) {

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, _ := arvadosclient.MakeArvadosClient()
	arv.ApiToken = "abc123"

	kc, _ := MakeKeepClient(arv)

	reader, writer := io.Pipe()
	uploadStatusChan := make(chan uploadStatus)

	f(kc, ks.url, reader, writer, uploadStatusChan)
}

func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
	st := &StubPutHandler{
		c:                    c,
		expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "",
		returnStorageClasses: "default=1",
		handled:              make(chan string),
	}

	UploadToStubHelper(c, st,
		func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
			go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())

			writer.Write([]byte("foo"))
			writer.Close()

			<-st.handled
			status := <-uploadStatusChan
			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
		})
}

func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
	st := &StubPutHandler{
		c:                    c,
		expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "",
		returnStorageClasses: "default=1",
		handled:              make(chan string),
	}

	UploadToStubHelper(c, st,
		func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
			go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())

			<-st.handled

			status := <-uploadStatusChan
			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
		})
}

func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
	for _, trial := range []struct {
		respHeader string
		expectMap  map[string]int
	}{
		{"", nil},
		{"foo=1", map[string]int{"foo": 1}},
		{" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
		{" =foo=1 ", nil},
		{"foo", nil},
	} {
		st := &StubPutHandler{
			c:                    c,
			expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
			expectAPIToken:       "abc123",
			expectBody:           "foo",
			expectStorageClass:   "",
			returnStorageClasses: trial.respHeader,
			handled:              make(chan string),
		}

		UploadToStubHelper(c, st,
			func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
				go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())

				writer.Write([]byte("foo"))
				writer.Close()

				<-st.handled
				status := <-uploadStatusChan
				c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
			})
	}
}

func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
	nServers := 5
	for _, trial := range []struct {
		replicas      int
		clientClasses []string
		putClasses    []string
		minRequests   int
		maxRequests   int
		success       bool
	}{
		// Talking to an older cluster (no default storage classes exported
		// config) and no other additional storage classes requirements.
		{1, nil, nil, 1, 1, true},
		{2, nil, nil, 2, 2, true},
		{3, nil, nil, 3, 3, true},
		{nServers*2 + 1, nil, nil, nServers, nServers, false},

		{1, []string{"class1"}, nil, 1, 1, true},
		{2, []string{"class1"}, nil, 2, 2, true},
		{3, []string{"class1"}, nil, 3, 3, true},
		{1, []string{"class1", "class2"}, nil, 1, 1, true},
		{nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},

		{1, nil, []string{"class1"}, 1, 1, true},
		{2, nil, []string{"class1"}, 2, 2, true},
		{3, nil, []string{"class1"}, 3, 3, true},
		{1, nil, []string{"class1", "class2"}, 1, 1, true},
		{nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
	} {
		c.Logf("%+v", trial)
		st := &StubPutHandler{
			c:                    c,
			expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
			expectAPIToken:       "abc123",
			expectBody:           "foo",
			expectStorageClass:   "*",
			returnStorageClasses: "", // Simulate old cluster without SC keep support
			handled:              make(chan string, 100),
		}
		ks := RunSomeFakeKeepServers(st, nServers)
		arv, _ := arvadosclient.MakeArvadosClient()
		kc, _ := MakeKeepClient(arv)
		kc.Want_replicas = trial.replicas
		kc.StorageClasses = trial.clientClasses
		kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
		arv.ApiToken = "abc123"
		localRoots := make(map[string]string)
		writableLocalRoots := make(map[string]string)
		for i, k := range ks {
			localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
			writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
			defer k.listener.Close()
		}
		kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

		_, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
			Data:           []byte("foo"),
			StorageClasses: trial.putClasses,
		})
		if trial.success {
			c.Check(err, IsNil)
		} else {
			c.Check(err, NotNil)
		}
		c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
		c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
		if trial.clientClasses == nil && trial.putClasses == nil {
			c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
		}
	}
}

func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
	nServers := 5
	for _, trial := range []struct {
		replicas       int
		defaultClasses []string
		clientClasses  []string // clientClasses takes precedence over defaultClasses
		putClasses     []string // putClasses takes precedence over clientClasses
		minRequests    int
		maxRequests    int
		success        bool
	}{
		{1, []string{"class1"}, nil, nil, 1, 1, true},
		{2, []string{"class1"}, nil, nil, 1, 2, true},
		{3, []string{"class1"}, nil, nil, 2, 3, true},
		{1, []string{"class1", "class2"}, nil, nil, 1, 1, true},

		// defaultClasses doesn't matter when any of the others is specified.
		{1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
		{2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
		{3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
		{1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
		{3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
		{1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
		{1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
		{1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
		{nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
		{1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
		{1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
		{1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
	} {
		c.Logf("%+v", trial)
		st := &StubPutHandler{
			c:                    c,
			expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
			expectAPIToken:       "abc123",
			expectBody:           "foo",
			expectStorageClass:   "*",
			returnStorageClasses: "class1=2, class2=2",
			handled:              make(chan string, 100),
		}
		ks := RunSomeFakeKeepServers(st, nServers)
		arv, _ := arvadosclient.MakeArvadosClient()
		kc, _ := MakeKeepClient(arv)
		kc.Want_replicas = trial.replicas
		kc.StorageClasses = trial.clientClasses
		kc.DefaultStorageClasses = trial.defaultClasses
		arv.ApiToken = "abc123"
		localRoots := make(map[string]string)
		writableLocalRoots := make(map[string]string)
		for i, k := range ks {
			localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
			writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
			defer k.listener.Close()
		}
		kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

		_, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
			Data:           []byte("foo"),
			StorageClasses: trial.putClasses,
		})
		if trial.success {
			c.Check(err, IsNil)
		} else {
			c.Check(err, NotNil)
		}
		c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
		c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
		if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
			// Max concurrency should be 1. First request
			// should have succeeded for class1. Second
			// request should only ask for class404.
			c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
		}
	}
}

type FailHandler struct {
	handled chan string
}

func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	resp.WriteHeader(500)
	fh.handled <- fmt.Sprintf("http://%s", req.Host)
}

type FailThenSucceedHandler struct {
	morefails      int // fail 1 + this many times before succeeding
	handled        chan string
	count          atomic.Int64
	successhandler http.Handler
	reqIDs         []string
}

func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
	if int(fh.count.Add(1)) <= fh.morefails+1 {
		resp.WriteHeader(500)
		fh.handled <- fmt.Sprintf("http://%s", req.Host)
	} else {
		fh.successhandler.ServeHTTP(resp, req)
	}
}

type Error404Handler struct {
	handled chan string
}

func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	resp.WriteHeader(404)
	fh.handled <- fmt.Sprintf("http://%s", req.Host)
}

func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
	st := FailHandler{
		make(chan string)}

	hash := "acbd18db4cc2f85cedef654fccc4a4d8"

	UploadToStubHelper(c, st,
		func(kc *KeepClient, url string, reader io.ReadCloser,
			writer io.WriteCloser, uploadStatusChan chan uploadStatus) {

			go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())

			writer.Write([]byte("foo"))
			writer.Close()

			<-st.handled

			status := <-uploadStatusChan
			c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
			c.Check(status.statusCode, Equals, 500)
		})
}

type KeepServer struct {
	listener net.Listener
	url      string
}

func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
	ks = make([]KeepServer, n)

	for i := 0; i < n; i++ {
		ks[i] = RunFakeKeepServer(st)
	}

	return ks
}

func (s *StandaloneSuite) TestPutB(c *C) {
	hash := Md5String("foo")

	st := &StubPutHandler{
		c:                    c,
		expectPath:           hash,
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "default",
		returnStorageClasses: "",
		handled:              make(chan string, 5),
	}

	arv, _ := arvadosclient.MakeArvadosClient()
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks := RunSomeFakeKeepServers(st, 5)

	for i, k := range ks {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	kc.PutB([]byte("foo"))

	shuff := NewRootSorter(
		kc.LocalRoots(), Md5String("foo")).GetSortedRoots()

	s1 := <-st.handled
	s2 := <-st.handled
	c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
		(s1 == shuff[1] && s2 == shuff[0]),
		Equals,
		true)
}

func (s *StandaloneSuite) TestPutHR(c *C) {
	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))

	st := &StubPutHandler{
		c:                    c,
		expectPath:           hash,
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "default",
		returnStorageClasses: "",
		handled:              make(chan string, 5),
	}

	arv, _ := arvadosclient.MakeArvadosClient()
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks := RunSomeFakeKeepServers(st, 5)

	for i, k := range ks {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)

	shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()

	s1 := <-st.handled
	s2 := <-st.handled

	c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
		(s1 == shuff[1] && s2 == shuff[0]),
		Equals,
		true)
}

func (s *StandaloneSuite) TestPutWithFail(c *C) {
	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))

	st := &StubPutHandler{
		c:                    c,
		expectPath:           hash,
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "default",
		returnStorageClasses: "",
		handled:              make(chan string, 4),
	}

	fh := FailHandler{
		make(chan string, 1)}

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks1 := RunSomeFakeKeepServers(st, 4)
	ks2 := RunSomeFakeKeepServers(fh, 1)

	for i, k := range ks1 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}
	for i, k := range ks2 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	shuff := NewRootSorter(
		kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
	c.Logf("%+v", shuff)

	phash, replicas, err := kc.PutB([]byte("foo"))

	<-fh.handled

	c.Check(err, IsNil)
	c.Check(phash, Equals, "")
	c.Check(replicas, Equals, 2)

	s1 := <-st.handled
	s2 := <-st.handled

	c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
		(s1 == shuff[2] && s2 == shuff[1]),
		Equals,
		true)
}

func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))

	st := &StubPutHandler{
		c:                    c,
		expectPath:           hash,
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "default",
		returnStorageClasses: "",
		handled:              make(chan string, 1),
	}

	fh := FailHandler{
		make(chan string, 4)}

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	kc.Retries = 0
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks1 := RunSomeFakeKeepServers(st, 1)
	ks2 := RunSomeFakeKeepServers(fh, 4)

	for i, k := range ks1 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}
	for i, k := range ks2 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	_, replicas, err := kc.PutB([]byte("foo"))

	c.Check(err, FitsTypeOf, InsufficientReplicasError{})
	c.Check(replicas, Equals, 1)
	c.Check(<-st.handled, Equals, ks1[0].url)
}

type StubGetHandler struct {
	c              *C
	expectPath     string
	expectAPIToken string
	httpStatus     int
	body           []byte
}

func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
	sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("Bearer %s", sgh.expectAPIToken))
	resp.WriteHeader(sgh.httpStatus)
	resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
	resp.Write(sgh.body)
}

func (s *StandaloneSuite) TestGet(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := StubGetHandler{
		c,
		hash,
		"abc123",
		http.StatusOK,
		[]byte("foo")}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, n, _, err := kc.Get(hash)
	c.Assert(err, IsNil)
	c.Check(n, Equals, int64(3))

	content, err2 := ioutil.ReadAll(r)
	c.Check(err2, IsNil)
	c.Check(content, DeepEquals, []byte("foo"))
	c.Check(r.Close(), IsNil)
}

func (s *StandaloneSuite) TestGet404(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := Error404Handler{make(chan string, 1)}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, n, _, err := kc.Get(hash)
	c.Check(err, Equals, BlockNotFound)
	c.Check(n, Equals, int64(0))
	c.Check(r, IsNil)
}

func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
	st := Error404Handler{make(chan string, 1)}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
	c.Check(err, IsNil)
	c.Check(n, Equals, int64(0))
	c.Assert(r, NotNil)
	buf, err := ioutil.ReadAll(r)
	c.Check(err, IsNil)
	c.Check(buf, DeepEquals, []byte{})
	c.Check(r.Close(), IsNil)
}

func (s *StandaloneSuite) TestGetFail(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := FailHandler{make(chan string, 1)}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
	kc.Retries = 0

	r, n, _, err := kc.Get(hash)
	errNotFound, _ := err.(*ErrNotFound)
	if c.Check(errNotFound, NotNil) {
		c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
		c.Check(errNotFound.Temporary(), Equals, true)
	}
	c.Check(n, Equals, int64(0))
	c.Check(r, IsNil)
}

func (s *StandaloneSuite) TestGetFailRetry(c *C) {
	defer func(origDefault, origMinimum time.Duration) {
		DefaultRetryDelay = origDefault
		MinimumRetryDelay = origMinimum
	}(DefaultRetryDelay, MinimumRetryDelay)
	DefaultRetryDelay = time.Second / 8
	MinimumRetryDelay = time.Millisecond

	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
		c.Logf("=== initial delay %v", delay)

		st := &FailThenSucceedHandler{
			morefails: 2,
			handled:   make(chan string, 4),
			successhandler: StubGetHandler{
				c,
				hash,
				"abc123",
				http.StatusOK,
				[]byte("foo")}}

		ks := RunFakeKeepServer(st)
		defer ks.listener.Close()

		arv, err := arvadosclient.MakeArvadosClient()
		c.Check(err, IsNil)
		kc, _ := MakeKeepClient(arv)
		arv.ApiToken = "abc123"
		kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
		kc.Retries = 3
		kc.RetryDelay = delay
		kc.DiskCacheSize = DiskCacheDisabled

		t0 := time.Now()
		r, n, _, err := kc.Get(hash)
		c.Assert(err, IsNil)
		c.Check(n, Equals, int64(3))
		elapsed := time.Since(t0)

		nonsleeptime := time.Second / 10
		expect := kc.RetryDelay
		if expect == 0 {
			expect = DefaultRetryDelay
		}
		min := MinimumRetryDelay * 3
		max := expect + expect*2 + expect*2*2 + nonsleeptime
		c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
		c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))

		content, err := ioutil.ReadAll(r)
		c.Check(err, IsNil)
		c.Check(content, DeepEquals, []byte("foo"))
		c.Check(r.Close(), IsNil)

		c.Logf("%q", st.reqIDs)
		if c.Check(st.reqIDs, Not(HasLen), 0) {
			for _, reqid := range st.reqIDs {
				c.Check(reqid, Not(Equals), "")
				c.Check(reqid, Equals, st.reqIDs[0])
			}
		}
	}
}

func (s *StandaloneSuite) TestGetNetError(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)

	r, n, _, err := kc.Get(hash)
	errNotFound, _ := err.(*ErrNotFound)
	if c.Check(errNotFound, NotNil) {
		c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
		c.Check(errNotFound.Temporary(), Equals, true)
	}
	c.Check(n, Equals, int64(0))
	c.Check(r, IsNil)
}

func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
	uuid := "zzzzz-bi6l4-123451234512345"
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	// This one shouldn't be used:
	ks0 := RunFakeKeepServer(StubGetHandler{
		c,
		"error if used",
		"abc123",
		http.StatusOK,
		[]byte("foo")})
	defer ks0.listener.Close()
	// This one should be used:
	ks := RunFakeKeepServer(StubGetHandler{
		c,
		hash + "+K@" + uuid,
		"abc123",
		http.StatusOK,
		[]byte("foo")})
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(
		map[string]string{"x": ks0.url},
		nil,
		map[string]string{uuid: ks.url})

	r, n, _, err := kc.Get(hash + "+K@" + uuid)
	c.Assert(err, IsNil)
	c.Check(n, Equals, int64(3))

	content, err := ioutil.ReadAll(r)
	c.Check(err, IsNil)
	c.Check(content, DeepEquals, []byte("foo"))
	c.Check(r.Close(), IsNil)
}

// Use a service hint to fetch from a local disk service, overriding
// rendezvous probe order.
func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
	uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	// This one shouldn't be used, although it appears first in
	// rendezvous probe order:
	ks0 := RunFakeKeepServer(StubGetHandler{
		c,
		"error if used",
		"abc123",
		http.StatusBadGateway,
		nil})
	defer ks0.listener.Close()
	// This one should be used:
	ks := RunFakeKeepServer(StubGetHandler{
		c,
		hash + "+K@" + uuid,
		"abc123",
		http.StatusOK,
		[]byte("foo")})
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(
		map[string]string{
			"zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
			"zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
			"zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
			uuid:                          ks.url},
		nil,
		map[string]string{
			"zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
			"zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
			"zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
			uuid:                          ks.url},
	)

	r, n, _, err := kc.Get(hash + "+K@" + uuid)
	c.Assert(err, IsNil)
	c.Check(n, Equals, int64(3))

	content, err := ioutil.ReadAll(r)
	c.Check(err, IsNil)
	c.Check(content, DeepEquals, []byte("foo"))
	c.Check(r.Close(), IsNil)
}

func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
	uuid := "zzzzz-bi6l4-123451234512345"
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	ksLocal := RunFakeKeepServer(StubGetHandler{
		c,
		hash + "+K@" + uuid,
		"abc123",
		http.StatusOK,
		[]byte("foo")})
	defer ksLocal.listener.Close()
	ksGateway := RunFakeKeepServer(StubGetHandler{
		c,
		hash + "+K@" + uuid,
		"abc123",
		http.StatusInternalServerError,
		[]byte("Error")})
	defer ksGateway.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(
		map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
		nil,
		map[string]string{uuid: ksGateway.url})

	r, n, _, err := kc.Get(hash + "+K@" + uuid)
	c.Assert(err, IsNil)
	c.Check(n, Equals, int64(3))

	content, err := ioutil.ReadAll(r)
	c.Check(err, IsNil)
	c.Check(content, DeepEquals, []byte("foo"))
	c.Check(r.Close(), IsNil)
}

type BarHandler struct {
	handled chan string
}

func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	resp.Write([]byte("bar"))
	h.handled <- fmt.Sprintf("http://%s", req.Host)
}

func (s *StandaloneSuite) TestChecksum(c *C) {
	foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
	barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))

	st := BarHandler{make(chan string, 1)}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, n, _, err := kc.Get(barhash)
	if c.Check(err, IsNil) {
		_, err = ioutil.ReadAll(r)
		c.Check(n, Equals, int64(3))
		c.Check(err, IsNil)
	}

	select {
	case <-st.handled:
	case <-time.After(time.Second):
		c.Fatal("timed out")
	}

	r, n, _, err = kc.Get(foohash)
	if err == nil {
		buf, readerr := ioutil.ReadAll(r)
		c.Logf("%q", buf)
		err = readerr
	}
	c.Check(err, Equals, BadChecksum)

	select {
	case <-st.handled:
	case <-time.After(time.Second):
		c.Fatal("timed out")
	}
}

func (s *StandaloneSuite) TestGetWithFailures(c *C) {
	content := []byte("waz")
	hash := fmt.Sprintf("%x+3", md5.Sum(content))

	fh := Error404Handler{
		make(chan string, 4)}

	st := StubGetHandler{
		c,
		hash,
		"abc123",
		http.StatusOK,
		content}

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks1 := RunSomeFakeKeepServers(st, 1)
	ks2 := RunSomeFakeKeepServers(fh, 4)

	for i, k := range ks1 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}
	for i, k := range ks2 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
	kc.Retries = 0

	// This test works only if one of the failing services is
	// attempted before the succeeding service. Otherwise,
	// <-fh.handled below will just hang! (Probe order depends on
	// the choice of block content "waz" and the UUIDs of the fake
	// servers, so we just tried different strings until we found
	// an example that passes this Assert.)
	c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)

	r, n, _, err := kc.Get(hash)

	select {
	case <-fh.handled:
	case <-time.After(time.Second):
		c.Fatal("timed out")
	}
	c.Assert(err, IsNil)
	c.Check(n, Equals, int64(3))

	readContent, err2 := ioutil.ReadAll(r)
	c.Check(err2, IsNil)
	c.Check(readContent, DeepEquals, content)
	c.Check(r.Close(), IsNil)
}

func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
	content := []byte("TestPutGetHead")

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, err := MakeKeepClient(arv)
	c.Assert(err, IsNil)

	hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))

	{
		n, _, err := kc.Ask(hash)
		c.Check(err, Equals, BlockNotFound)
		c.Check(n, Equals, int64(0))
	}
	{
		hash2, replicas, err := kc.PutB(content)
		c.Check(err, IsNil)
		c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
		c.Check(replicas, Equals, 2)
	}
	{
		r, n, _, err := kc.Get(hash)
		c.Check(err, IsNil)
		c.Check(n, Equals, int64(len(content)))
		if c.Check(r, NotNil) {
			readContent, err := ioutil.ReadAll(r)
			c.Check(err, IsNil)
			if c.Check(len(readContent), Equals, len(content)) {
				c.Check(readContent, DeepEquals, content)
			}
			c.Check(r.Close(), IsNil)
		}
	}
	{
		n, url2, err := kc.Ask(hash)
		c.Check(err, IsNil)
		c.Check(n, Equals, int64(len(content)))
		c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
	}
	{
		loc, err := kc.LocalLocator(hash)
		c.Check(err, IsNil)
		c.Assert(len(loc) >= 32, Equals, true)
		c.Check(loc[:32], Equals, hash[:32])
	}
	{
		content := []byte("the perth county conspiracy")
		loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
		c.Check(loc, Equals, "")
		c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
		c.Check(err, ErrorMatches, `.*HTTP 400.*`)
	}
}

type StubProxyHandler struct {
	handled chan string
}

func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	resp.Header().Set("X-Keep-Replicas-Stored", "2")
	h.handled <- fmt.Sprintf("http://%s", req.Host)
}

func (s *StandaloneSuite) TestPutProxy(c *C) {
	st := StubProxyHandler{make(chan string, 1)}

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks1 := RunSomeFakeKeepServers(st, 1)

	for i, k := range ks1 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	_, replicas, err := kc.PutB([]byte("foo"))
	<-st.handled

	c.Check(err, IsNil)
	c.Check(replicas, Equals, 2)
}

func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
	st := StubProxyHandler{make(chan string, 1)}

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 3
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks1 := RunSomeFakeKeepServers(st, 1)

	for i, k := range ks1 {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}
	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	_, replicas, err := kc.PutB([]byte("foo"))
	<-st.handled

	c.Check(err, FitsTypeOf, InsufficientReplicasError{})
	c.Check(replicas, Equals, 2)
}

func (s *StandaloneSuite) TestMakeLocator(c *C) {
	l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
	c.Check(err, IsNil)
	c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
	c.Check(l.Size, Equals, 3)
	c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
}

func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
	l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
	c.Check(err, IsNil)
	c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
	c.Check(l.Size, Equals, -1)
	c.Check(l.Hints, DeepEquals, []string{})
}

func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
	l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
	c.Check(err, IsNil)
	c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
	c.Check(l.Size, Equals, -1)
	c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
}

func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
	str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
	l, err := MakeLocator(str)
	c.Check(err, IsNil)
	c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
	c.Check(l.Size, Equals, 3)
	c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
	c.Check(l.String(), Equals, str)
}

func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
	_, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
	c.Check(err, Equals, InvalidLocatorError)
}

func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
	hash := Md5String("foo")

	st := &StubPutHandler{
		c:                    c,
		expectPath:           hash,
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "default",
		returnStorageClasses: "",
		handled:              make(chan string, 5),
	}

	arv, _ := arvadosclient.MakeArvadosClient()
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks := RunSomeFakeKeepServers(st, 5)

	for i, k := range ks {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		if i == 0 {
			writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		}
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	_, replicas, err := kc.PutB([]byte("foo"))

	c.Check(err, FitsTypeOf, InsufficientReplicasError{})
	c.Check(replicas, Equals, 1)

	c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
}

func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
	hash := Md5String("foo")

	st := &StubPutHandler{
		c:                    c,
		expectPath:           hash,
		expectAPIToken:       "abc123",
		expectBody:           "foo",
		expectStorageClass:   "",
		returnStorageClasses: "",
		handled:              make(chan string, 5),
	}

	arv, _ := arvadosclient.MakeArvadosClient()
	kc, _ := MakeKeepClient(arv)

	kc.Want_replicas = 2
	arv.ApiToken = "abc123"
	localRoots := make(map[string]string)
	writableLocalRoots := make(map[string]string)

	ks := RunSomeFakeKeepServers(st, 5)

	for i, k := range ks {
		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
		defer k.listener.Close()
	}

	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

	_, replicas, err := kc.PutB([]byte("foo"))

	c.Check(err, FitsTypeOf, InsufficientReplicasError{})
	c.Check(replicas, Equals, 0)
}

type StubGetIndexHandler struct {
	c              *C
	expectPath     string
	expectAPIToken string
	httpStatus     int
	body           []byte
}

func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	h.c.Check(req.URL.Path, Equals, h.expectPath)
	h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("Bearer %s", h.expectAPIToken))
	resp.WriteHeader(h.httpStatus)
	resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
	resp.Write(h.body)
}

func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := StubGetIndexHandler{
		c,
		"/index",
		"abc123",
		http.StatusOK,
		[]byte(hash + " 1443559274\n\n")}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Assert(err, IsNil)
	kc, err := MakeKeepClient(arv)
	c.Assert(err, IsNil)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, err := kc.GetIndex("x", "")
	c.Check(err, IsNil)

	content, err2 := ioutil.ReadAll(r)
	c.Check(err2, IsNil)
	c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}

func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := StubGetIndexHandler{
		c,
		"/index/" + hash[0:3],
		"abc123",
		http.StatusOK,
		[]byte(hash + " 1443559274\n\n")}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, err := kc.GetIndex("x", hash[0:3])
	c.Assert(err, IsNil)

	content, err2 := ioutil.ReadAll(r)
	c.Check(err2, IsNil)
	c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}

func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := StubGetIndexHandler{
		c,
		"/index/" + hash[0:3],
		"abc123",
		http.StatusOK,
		[]byte(hash)}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	_, err = kc.GetIndex("x", hash[0:3])
	c.Check(err, Equals, ErrIncompleteIndex)
}

func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))

	st := StubGetIndexHandler{
		c,
		"/index/" + hash[0:3],
		"abc123",
		http.StatusOK,
		[]byte(hash)}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	_, err = kc.GetIndex("y", hash[0:3])
	c.Check(err, Equals, ErrNoSuchKeepServer)
}

func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
	st := StubGetIndexHandler{
		c,
		"/index/abcd",
		"abc123",
		http.StatusOK,
		[]byte("\n")}

	ks := RunFakeKeepServer(st)
	defer ks.listener.Close()

	arv, err := arvadosclient.MakeArvadosClient()
	c.Check(err, IsNil)
	kc, _ := MakeKeepClient(arv)
	arv.ApiToken = "abc123"
	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)

	r, err := kc.GetIndex("x", "abcd")
	c.Check(err, IsNil)

	content, err2 := ioutil.ReadAll(r)
	c.Check(err2, IsNil)
	c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}

func (s *StandaloneSuite) TestPutBRetry(c *C) {
	DefaultRetryDelay = time.Second / 8
	MinimumRetryDelay = time.Millisecond

	for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
		c.Logf("=== initial delay %v", delay)

		st := &FailThenSucceedHandler{
			morefails: 5, // handler will fail 6x in total, 3 for each server
			handled:   make(chan string, 10),
			successhandler: &StubPutHandler{
				c:                    c,
				expectPath:           Md5String("foo"),
				expectAPIToken:       "abc123",
				expectBody:           "foo",
				expectStorageClass:   "default",
				returnStorageClasses: "",
				handled:              make(chan string, 5),
			},
		}

		arv, _ := arvadosclient.MakeArvadosClient()
		kc, _ := MakeKeepClient(arv)
		kc.Retries = 3
		kc.RetryDelay = delay
		kc.DiskCacheSize = DiskCacheDisabled
		kc.Want_replicas = 2

		arv.ApiToken = "abc123"
		localRoots := make(map[string]string)
		writableLocalRoots := make(map[string]string)

		ks := RunSomeFakeKeepServers(st, 2)

		for i, k := range ks {
			localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
			writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
			defer k.listener.Close()
		}

		kc.SetServiceRoots(localRoots, writableLocalRoots, nil)

		t0 := time.Now()
		hash, replicas, err := kc.PutB([]byte("foo"))

		c.Check(err, IsNil)
		c.Check(hash, Equals, "")
		c.Check(replicas, Equals, 2)
		elapsed := time.Since(t0)

		nonsleeptime := time.Second / 10
		expect := kc.RetryDelay
		if expect == 0 {
			expect = DefaultRetryDelay
		}
		min := MinimumRetryDelay * 3
		max := expect + expect*2 + expect*2*2
		max += nonsleeptime
		checkInterval(c, elapsed, min, max)
	}
}

func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
	arv, err := arvadosclient.MakeArvadosClient()
	c.Assert(err, IsNil)

	// Add an additional "testblobstore" keepservice
	blobKeepService := make(arvadosclient.Dict)
	err = arv.Create("keep_services",
		arvadosclient.Dict{"keep_service": arvadosclient.Dict{
			"service_host": "localhost",
			"service_port": "21321",
			"service_type": "testblobstore"}},
		&blobKeepService)
	c.Assert(err, IsNil)
	defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
	RefreshServiceDiscovery()

	// Make a keepclient and ensure that the testblobstore is included
	kc, err := MakeKeepClient(arv)
	c.Assert(err, IsNil)

	// verify kc.LocalRoots
	c.Check(len(kc.LocalRoots()), Equals, 3)
	for _, root := range kc.LocalRoots() {
		c.Check(root, Matches, "http://localhost:\\d+")
	}
	c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")

	// verify kc.GatewayRoots
	c.Check(len(kc.GatewayRoots()), Equals, 3)
	for _, root := range kc.GatewayRoots() {
		c.Check(root, Matches, "http://localhost:\\d+")
	}
	c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")

	// verify kc.WritableLocalRoots
	c.Check(len(kc.WritableLocalRoots()), Equals, 3)
	for _, root := range kc.WritableLocalRoots() {
		c.Check(root, Matches, "http://localhost:\\d+")
	}
	c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")

	c.Assert(kc.replicasPerService, Equals, 0)
	c.Assert(kc.foundNonDiskSvc, Equals, true)
	c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
}

func (s *StandaloneSuite) TestDelayCalculator_Default(c *C) {
	MinimumRetryDelay = time.Second / 2
	DefaultRetryDelay = time.Second

	dc := delayCalculator{InitialMaxDelay: 0}
	checkInterval(c, dc.Next(), time.Second/2, time.Second)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
}

func (s *StandaloneSuite) TestDelayCalculator_SetInitial(c *C) {
	MinimumRetryDelay = time.Second / 2
	DefaultRetryDelay = time.Second

	dc := delayCalculator{InitialMaxDelay: time.Second * 2}
	checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*16)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
	checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
}

func (s *StandaloneSuite) TestDelayCalculator_EnsureSomeLongDelays(c *C) {
	dc := delayCalculator{InitialMaxDelay: time.Second * 5}
	var d time.Duration
	n := 4000
	for i := 0; i < n; i++ {
		if i < 20 || i%10 == 0 {
			c.Logf("i=%d, delay=%v", i, d)
		}
		if d = dc.Next(); d > dc.InitialMaxDelay*9 {
			return
		}
	}
	c.Errorf("after %d trials, never got a delay more than 90%% of expected max %d; last was %v", n, dc.InitialMaxDelay*10, d)
}

// If InitialMaxDelay is less than MinimumRetryDelay/10, then delay is
// always MinimumRetryDelay.
func (s *StandaloneSuite) TestDelayCalculator_InitialLessThanMinimum(c *C) {
	MinimumRetryDelay = time.Second / 2
	dc := delayCalculator{InitialMaxDelay: time.Millisecond}
	for i := 0; i < 20; i++ {
		c.Check(dc.Next(), Equals, time.Second/2)
	}
}

func checkInterval(c *C, t, min, max time.Duration) {
	c.Check(t >= min, Equals, true, Commentf("got %v which is below expected min %v", t, min))
	c.Check(t <= max, Equals, true, Commentf("got %v which is above expected max %v", t, max))
}