"fmt"
"io"
"io/ioutil"
- "log"
"net"
"net/http"
"os"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
. "gopkg.in/check.v1"
)
-// Gocheck boilerplate
func Test(t *testing.T) {
+ DefaultRetryDelay = 50 * time.Millisecond
TestingT(t)
}
type ServerRequiredSuite struct{}
// Standalone tests
-type StandaloneSuite struct{}
+type StandaloneSuite struct {
+ origDefaultRetryDelay time.Duration
+ origMinimumRetryDelay time.Duration
+}
var origHOME = os.Getenv("HOME")
RefreshServiceDiscovery()
// Prevent cache state from leaking between test cases
os.Setenv("HOME", c.MkDir())
+ s.origDefaultRetryDelay = DefaultRetryDelay
+ s.origMinimumRetryDelay = MinimumRetryDelay
}
func (s *StandaloneSuite) TearDownTest(c *C) {
os.Setenv("HOME", origHOME)
+ DefaultRetryDelay = s.origDefaultRetryDelay
+ MinimumRetryDelay = s.origMinimumRetryDelay
}
func pythonDir() string {
}
func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
- log.Printf("TestUploadToStubKeepServer")
-
st := &StubPutHandler{
c: c,
expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
}
type FailThenSucceedHandler struct {
+ morefails int // fail 1 + this many times before succeeding
handled chan string
- count int
+ count atomic.Int64
successhandler http.Handler
reqIDs []string
}
func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
- if fh.count == 0 {
+ if int(fh.count.Add(1)) <= fh.morefails+1 {
resp.WriteHeader(500)
- fh.count++
fh.handled <- fmt.Sprintf("http://%s", req.Host)
} else {
fh.successhandler.ServeHTTP(resp, req)
kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
- reader, writer := io.Pipe()
-
- go func() {
- writer.Write([]byte("foo"))
- writer.Close()
- }()
-
- kc.PutHR(hash, reader, 3)
+ kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
}
func (s *StandaloneSuite) TestGet(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetHandler{
c,
}
func (s *StandaloneSuite) TestGet404(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := Error404Handler{make(chan string, 1)}
}
func (s *StandaloneSuite) TestGetFail(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := FailHandler{make(chan string, 1)}
}
func (s *StandaloneSuite) TestGetFailRetry(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ defer func(origDefault, origMinimum time.Duration) {
+ DefaultRetryDelay = origDefault
+ MinimumRetryDelay = origMinimum
+ }(DefaultRetryDelay, MinimumRetryDelay)
+ DefaultRetryDelay = time.Second / 8
+ MinimumRetryDelay = time.Millisecond
- st := &FailThenSucceedHandler{
- handled: make(chan string, 1),
- successhandler: StubGetHandler{
- c,
- hash,
- "abc123",
- http.StatusOK,
- []byte("foo")}}
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
- ks := RunFakeKeepServer(st)
- defer ks.listener.Close()
+ for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+ c.Logf("=== initial delay %v", delay)
- arv, err := arvadosclient.MakeArvadosClient()
- c.Check(err, IsNil)
- kc, _ := MakeKeepClient(arv)
- arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ st := &FailThenSucceedHandler{
+ morefails: 2,
+ handled: make(chan string, 4),
+ successhandler: StubGetHandler{
+ c,
+ hash,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")}}
- r, n, _, err := kc.Get(hash)
- c.Assert(err, IsNil)
- c.Check(n, Equals, int64(3))
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
- content, err := ioutil.ReadAll(r)
- c.Check(err, IsNil)
- c.Check(content, DeepEquals, []byte("foo"))
- c.Check(r.Close(), IsNil)
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Check(err, IsNil)
+ kc, _ := MakeKeepClient(arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ kc.Retries = 3
+ kc.RetryDelay = delay
+ kc.DiskCacheSize = DiskCacheDisabled
- c.Logf("%q", st.reqIDs)
- c.Assert(len(st.reqIDs) > 1, Equals, true)
- for _, reqid := range st.reqIDs {
- c.Check(reqid, Not(Equals), "")
- c.Check(reqid, Equals, st.reqIDs[0])
+ t0 := time.Now()
+ r, n, _, err := kc.Get(hash)
+ c.Assert(err, IsNil)
+ c.Check(n, Equals, int64(3))
+ elapsed := time.Since(t0)
+
+ nonsleeptime := time.Second / 10
+ expect := kc.RetryDelay
+ if expect == 0 {
+ expect = DefaultRetryDelay
+ }
+ min := MinimumRetryDelay * 3
+ max := expect + expect*2 + expect*2*2 + nonsleeptime
+ c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+ c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, IsNil)
+ c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
+
+ c.Logf("%q", st.reqIDs)
+ if c.Check(st.reqIDs, Not(HasLen), 0) {
+ for _, reqid := range st.reqIDs {
+ c.Check(reqid, Not(Equals), "")
+ c.Check(reqid, Equals, st.reqIDs[0])
+ }
+ }
}
}
func (s *StandaloneSuite) TestGetNetError(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
arv, err := arvadosclient.MakeArvadosClient()
c.Check(err, IsNil)
func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
uuid := "zzzzz-bi6l4-123451234512345"
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
// This one shouldn't be used:
ks0 := RunFakeKeepServer(StubGetHandler{
// rendezvous probe order.
func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
// This one shouldn't be used, although it appears first in
// rendezvous probe order:
func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
uuid := "zzzzz-bi6l4-123451234512345"
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
ksLocal := RunFakeKeepServer(StubGetHandler{
c,
}
func (s *StandaloneSuite) TestChecksum(c *C) {
- foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
+ foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
+ barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
st := BarHandler{make(chan string, 1)}
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, n, _, err := kc.Get(barhash)
- c.Check(err, IsNil)
- _, err = ioutil.ReadAll(r)
- c.Check(n, Equals, int64(3))
- c.Check(err, IsNil)
+ if c.Check(err, IsNil) {
+ _, err = ioutil.ReadAll(r)
+ c.Check(n, Equals, int64(3))
+ c.Check(err, IsNil)
+ }
- <-st.handled
+ select {
+ case <-st.handled:
+ case <-time.After(time.Second):
+ c.Fatal("timed out")
+ }
r, n, _, err = kc.Get(foohash)
if err == nil {
}
c.Check(err, Equals, BadChecksum)
- <-st.handled
+ select {
+ case <-st.handled:
+ case <-time.After(time.Second):
+ c.Fatal("timed out")
+ }
}
func (s *StandaloneSuite) TestGetWithFailures(c *C) {
content := []byte("waz")
- hash := fmt.Sprintf("%x", md5.Sum(content))
+ hash := fmt.Sprintf("%x+3", md5.Sum(content))
fh := Error404Handler{
make(chan string, 4)}
r, n, _, err := kc.Get(hash)
- <-fh.handled
+ select {
+ case <-fh.handled:
+ case <-time.After(time.Second):
+ c.Fatal("timed out")
+ }
c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
kc, err := MakeKeepClient(arv)
c.Assert(err, IsNil)
- hash := fmt.Sprintf("%x", md5.Sum(content))
+ hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
{
n, _, err := kc.Ask(hash)
{
hash2, replicas, err := kc.PutB(content)
c.Check(err, IsNil)
- c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
+ c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
c.Check(replicas, Equals, 2)
}
{
n, url2, err := kc.Ask(hash)
c.Check(err, IsNil)
c.Check(n, Equals, int64(len(content)))
- c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
+ c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
}
{
loc, err := kc.LocalLocator(hash)
}
func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetIndexHandler{
c,
"/index",
"abc123",
http.StatusOK,
- []byte(hash + "+3 1443559274\n\n")}
+ []byte(hash + " 1443559274\n\n")}
ks := RunFakeKeepServer(st)
defer ks.listener.Close()
"/index/" + hash[0:3],
"abc123",
http.StatusOK,
- []byte(hash + "+3 1443559274\n\n")}
+ []byte(hash + " 1443559274\n\n")}
ks := RunFakeKeepServer(st)
defer ks.listener.Close()
}
func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetIndexHandler{
c,
}
func (s *StandaloneSuite) TestPutBRetry(c *C) {
- st := &FailThenSucceedHandler{
- handled: make(chan string, 1),
- successhandler: &StubPutHandler{
- c: c,
- expectPath: Md5String("foo"),
- expectAPIToken: "abc123",
- expectBody: "foo",
- expectStorageClass: "default",
- returnStorageClasses: "",
- handled: make(chan string, 5),
- },
- }
+ DefaultRetryDelay = time.Second / 8
+ MinimumRetryDelay = time.Millisecond
+
+ for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+ c.Logf("=== initial delay %v", delay)
+
+ st := &FailThenSucceedHandler{
+ morefails: 5, // handler will fail 6x in total, 3 for each server
+ handled: make(chan string, 10),
+ successhandler: &StubPutHandler{
+ c: c,
+ expectPath: Md5String("foo"),
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "default",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ },
+ }
- arv, _ := arvadosclient.MakeArvadosClient()
- kc, _ := MakeKeepClient(arv)
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(arv)
+ kc.Retries = 3
+ kc.RetryDelay = delay
+ kc.DiskCacheSize = DiskCacheDisabled
+ kc.Want_replicas = 2
- kc.Want_replicas = 2
- arv.ApiToken = "abc123"
- localRoots := make(map[string]string)
- writableLocalRoots := make(map[string]string)
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableLocalRoots := make(map[string]string)
- ks := RunSomeFakeKeepServers(st, 2)
+ ks := RunSomeFakeKeepServers(st, 2)
- for i, k := range ks {
- localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
- writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
- defer k.listener.Close()
- }
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
+ }
- kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+ kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
- hash, replicas, err := kc.PutB([]byte("foo"))
+ t0 := time.Now()
+ hash, replicas, err := kc.PutB([]byte("foo"))
- c.Check(err, IsNil)
- c.Check(hash, Equals, "")
- c.Check(replicas, Equals, 2)
+ c.Check(err, IsNil)
+ c.Check(hash, Equals, "")
+ c.Check(replicas, Equals, 2)
+ elapsed := time.Since(t0)
+
+ nonsleeptime := time.Second / 10
+ expect := kc.RetryDelay
+ if expect == 0 {
+ expect = DefaultRetryDelay
+ }
+ min := MinimumRetryDelay * 3
+ max := expect + expect*2 + expect*2*2
+ max += nonsleeptime
+ checkInterval(c, elapsed, min, max)
+ }
}
func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
c.Assert(kc.foundNonDiskSvc, Equals, true)
c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
}
+
+func (s *StandaloneSuite) TestDelayCalculator_Default(c *C) {
+ MinimumRetryDelay = time.Second / 2
+ DefaultRetryDelay = time.Second
+
+ dc := delayCalculator{InitialMaxDelay: 0}
+ checkInterval(c, dc.Next(), time.Second/2, time.Second)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
+}
+
+func (s *StandaloneSuite) TestDelayCalculator_SetInitial(c *C) {
+ MinimumRetryDelay = time.Second / 2
+ DefaultRetryDelay = time.Second
+
+ dc := delayCalculator{InitialMaxDelay: time.Second * 2}
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*16)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
+ checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
+}
+
+func (s *StandaloneSuite) TestDelayCalculator_EnsureSomeLongDelays(c *C) {
+ dc := delayCalculator{InitialMaxDelay: time.Second * 5}
+ var d time.Duration
+ n := 4000
+ for i := 0; i < n; i++ {
+ if i < 20 || i%10 == 0 {
+ c.Logf("i=%d, delay=%v", i, d)
+ }
+ if d = dc.Next(); d > dc.InitialMaxDelay*9 {
+ return
+ }
+ }
+ c.Errorf("after %d trials, never got a delay more than 90%% of expected max %d; last was %v", n, dc.InitialMaxDelay*10, d)
+}
+
+// If InitialMaxDelay is less than MinimumRetryDelay/10, then delay is
+// always MinimumRetryDelay.
+func (s *StandaloneSuite) TestDelayCalculator_InitialLessThanMinimum(c *C) {
+ MinimumRetryDelay = time.Second / 2
+ dc := delayCalculator{InitialMaxDelay: time.Millisecond}
+ for i := 0; i < 20; i++ {
+ c.Check(dc.Next(), Equals, time.Second/2)
+ }
+}
+
+func checkInterval(c *C, t, min, max time.Duration) {
+ c.Check(t >= min, Equals, true, Commentf("got %v which is below expected min %v", t, min))
+ c.Check(t <= max, Equals, true, Commentf("got %v which is above expected max %v", t, max))
+}