"fmt"
"io"
"io/ioutil"
+ "math/rand"
"net"
"net/http"
"os"
DefaultProxyTLSHandshakeTimeout = 10 * time.Second
DefaultProxyKeepAlive = 120 * time.Second
+ DefaultRetryDelay = 2 * time.Second // see KeepClient.RetryDelay
+
rootCacheDir = "/var/cache/arvados/keep"
userCacheDir = ".cache/arvados/keep" // relative to HOME
)
// KeepClient holds information about Arvados and Keep servers.
type KeepClient struct {
- Arvados *arvadosclient.ArvadosClient
- Want_replicas int
- localRoots map[string]string
- writableLocalRoots map[string]string
- gatewayRoots map[string]string
- lock sync.RWMutex
- HTTPClient HTTPClient
- Retries int
+ Arvados *arvadosclient.ArvadosClient
+ Want_replicas int
+ localRoots map[string]string
+ writableLocalRoots map[string]string
+ gatewayRoots map[string]string
+ lock sync.RWMutex
+ HTTPClient HTTPClient
+
+ // Number of times to automatically retry a read/write
+ // operation after a transient failure.
+ Retries int
+
+ // Initial delay after first attempt, used when automatic
+ // retry is invoked. If zero, DefaultRetryDelay is used.
+ // Delays after subsequent attempts are increased by a random
+ // factor between 1.75x and 2x, up to a maximum of 10x the
+ // initial delay.
+ RetryDelay time.Duration
+
RequestID string
StorageClasses []string
DefaultStorageClasses []string // Set by cluster's exported config
gatewayRoots: kc.gatewayRoots,
HTTPClient: kc.HTTPClient,
Retries: kc.Retries,
+ RetryDelay: kc.RetryDelay,
RequestID: kc.RequestID,
StorageClasses: kc.StorageClasses,
DefaultStorageClasses: kc.DefaultStorageClasses,
var errs []string
+ retryDelay := kc.RetryDelay
+ if retryDelay < 1 {
+ retryDelay = DefaultRetryDelay
+ }
+ maxRetryDelay := retryDelay * 10
triesRemaining := 1 + kc.Retries
serversToTry := kc.getSortedRoots(locator)
return nil, expectLength, url, resp.Header, nil
}
serversToTry = retryList
+ if len(serversToTry) > 0 && triesRemaining > 0 {
+ time.Sleep(retryDelay)
+ // Increase delay by a random factor between
+ // 1.75x and 2x
+ retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay))
+ if retryDelay > maxRetryDelay {
+ retryDelay = maxRetryDelay
+ }
+ }
}
DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
"os"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
. "gopkg.in/check.v1"
)
-// Gocheck boilerplate
func Test(t *testing.T) {
+ DefaultRetryDelay = 50 * time.Millisecond
TestingT(t)
}
}
type FailThenSucceedHandler struct {
+ morefails int // fail 1 + this many times before succeeding
handled chan string
- count int
+ count atomic.Int64
successhandler http.Handler
reqIDs []string
}
func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
- if fh.count == 0 {
+ if int(fh.count.Add(1)) <= fh.morefails+1 {
resp.WriteHeader(500)
- fh.count++
fh.handled <- fmt.Sprintf("http://%s", req.Host)
} else {
fh.successhandler.ServeHTTP(resp, req)
kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
- reader, writer := io.Pipe()
-
- go func() {
- writer.Write([]byte("foo"))
- writer.Close()
- }()
-
- kc.PutHR(hash, reader, 3)
+ kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
}
func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+ defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+ DefaultRetryDelay = time.Second / 8
+
hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
- st := &FailThenSucceedHandler{
- handled: make(chan string, 1),
- successhandler: StubGetHandler{
- c,
- hash,
- "abc123",
- http.StatusOK,
- []byte("foo")}}
+ for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+ c.Logf("=== initial delay %v", delay)
- ks := RunFakeKeepServer(st)
- defer ks.listener.Close()
+ st := &FailThenSucceedHandler{
+ morefails: 2,
+ handled: make(chan string, 4),
+ successhandler: StubGetHandler{
+ c,
+ hash,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")}}
- arv, err := arvadosclient.MakeArvadosClient()
- c.Check(err, IsNil)
- kc, _ := MakeKeepClient(arv)
- arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
- r, n, _, err := kc.Get(hash)
- c.Assert(err, IsNil)
- c.Check(n, Equals, int64(3))
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Check(err, IsNil)
+ kc, _ := MakeKeepClient(arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ kc.Retries = 3
+ kc.RetryDelay = delay
+ kc.DiskCacheSize = DiskCacheDisabled
- content, err := ioutil.ReadAll(r)
- c.Check(err, IsNil)
- c.Check(content, DeepEquals, []byte("foo"))
- c.Check(r.Close(), IsNil)
+ t0 := time.Now()
+ r, n, _, err := kc.Get(hash)
+ c.Assert(err, IsNil)
+ c.Check(n, Equals, int64(3))
+ elapsed := time.Since(t0)
- c.Logf("%q", st.reqIDs)
- c.Assert(len(st.reqIDs) > 1, Equals, true)
- for _, reqid := range st.reqIDs {
- c.Check(reqid, Not(Equals), "")
- c.Check(reqid, Equals, st.reqIDs[0])
+ nonsleeptime := time.Second / 10
+ expect := kc.RetryDelay
+ if expect == 0 {
+ expect = DefaultRetryDelay
+ }
+ min := expect + expect*7/4 + expect*7/4*7/4
+ max := expect + expect*2 + expect*2*2 + nonsleeptime
+ c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+ c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, IsNil)
+ c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
+
+ c.Logf("%q", st.reqIDs)
+ if c.Check(st.reqIDs, Not(HasLen), 0) {
+ for _, reqid := range st.reqIDs {
+ c.Check(reqid, Not(Equals), "")
+ c.Check(reqid, Equals, st.reqIDs[0])
+ }
+ }
}
}
}
func (s *StandaloneSuite) TestPutBRetry(c *C) {
- st := &FailThenSucceedHandler{
- handled: make(chan string, 1),
- successhandler: &StubPutHandler{
- c: c,
- expectPath: Md5String("foo"),
- expectAPIToken: "abc123",
- expectBody: "foo",
- expectStorageClass: "default",
- returnStorageClasses: "",
- handled: make(chan string, 5),
- },
- }
+ defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+ DefaultRetryDelay = time.Second / 8
+
+ for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+ c.Logf("=== initial delay %v", delay)
+
+ st := &FailThenSucceedHandler{
+ morefails: 5, // handler will fail 6x in total, 3 for each server
+ handled: make(chan string, 10),
+ successhandler: &StubPutHandler{
+ c: c,
+ expectPath: Md5String("foo"),
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "default",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ },
+ }
- arv, _ := arvadosclient.MakeArvadosClient()
- kc, _ := MakeKeepClient(arv)
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(arv)
+ kc.Retries = 3
+ kc.RetryDelay = delay
+ kc.DiskCacheSize = DiskCacheDisabled
+ kc.Want_replicas = 2
- kc.Want_replicas = 2
- arv.ApiToken = "abc123"
- localRoots := make(map[string]string)
- writableLocalRoots := make(map[string]string)
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableLocalRoots := make(map[string]string)
- ks := RunSomeFakeKeepServers(st, 2)
+ ks := RunSomeFakeKeepServers(st, 2)
- for i, k := range ks {
- localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
- writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
- defer k.listener.Close()
- }
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
+ }
- kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+ kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
- hash, replicas, err := kc.PutB([]byte("foo"))
+ t0 := time.Now()
+ hash, replicas, err := kc.PutB([]byte("foo"))
- c.Check(err, IsNil)
- c.Check(hash, Equals, "")
- c.Check(replicas, Equals, 2)
+ c.Check(err, IsNil)
+ c.Check(hash, Equals, "")
+ c.Check(replicas, Equals, 2)
+ elapsed := time.Since(t0)
+
+ nonsleeptime := time.Second / 10
+ expect := kc.RetryDelay
+ if expect == 0 {
+ expect = DefaultRetryDelay
+ }
+ min := expect + expect*7/4 + expect*7/4*7/4
+ max := expect + expect*2 + expect*2*2 + nonsleeptime
+ c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+ c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+ }
}
func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {