From 22361307cf41f916afd562e7f33fcdaacefe5f9d Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 14 Feb 2024 14:43:14 -0500 Subject: [PATCH] 21023: Add exponential-backoff delay between keepclient retries. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/keepclient/keepclient.go | 45 +++++-- sdk/go/keepclient/keepclient_test.go | 177 +++++++++++++++++---------- sdk/go/keepclient/support.go | 20 ++- services/keepproxy/keepproxy.go | 1 + services/keepproxy/keepproxy_test.go | 2 +- services/keepstore/command.go | 1 + 6 files changed, 167 insertions(+), 79 deletions(-) diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 2bd7996b59..a455b96fb7 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -15,6 +15,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net" "net/http" "os" @@ -44,6 +45,8 @@ var ( DefaultProxyTLSHandshakeTimeout = 10 * time.Second DefaultProxyKeepAlive = 120 * time.Second + DefaultRetryDelay = 2 * time.Second // see KeepClient.RetryDelay + rootCacheDir = "/var/cache/arvados/keep" userCacheDir = ".cache/arvados/keep" // relative to HOME ) @@ -105,14 +108,25 @@ const DiskCacheDisabled = arvados.ByteSizeOrPercent(1) // KeepClient holds information about Arvados and Keep servers. type KeepClient struct { - Arvados *arvadosclient.ArvadosClient - Want_replicas int - localRoots map[string]string - writableLocalRoots map[string]string - gatewayRoots map[string]string - lock sync.RWMutex - HTTPClient HTTPClient - Retries int + Arvados *arvadosclient.ArvadosClient + Want_replicas int + localRoots map[string]string + writableLocalRoots map[string]string + gatewayRoots map[string]string + lock sync.RWMutex + HTTPClient HTTPClient + + // Number of times to automatically retry a read/write + // operation after a transient failure. + Retries int + + // Initial delay after first attempt, used when automatic + // retry is invoked. If zero, DefaultRetryDelay is used. + // Delays after subsequent attempts are increased by a random + // factor between 1.75x and 2x, up to a maximum of 10x the + // initial delay. + RetryDelay time.Duration + RequestID string StorageClasses []string DefaultStorageClasses []string // Set by cluster's exported config @@ -141,6 +155,7 @@ func (kc *KeepClient) Clone() *KeepClient { gatewayRoots: kc.gatewayRoots, HTTPClient: kc.HTTPClient, Retries: kc.Retries, + RetryDelay: kc.RetryDelay, RequestID: kc.RequestID, StorageClasses: kc.StorageClasses, DefaultStorageClasses: kc.DefaultStorageClasses, @@ -269,6 +284,11 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade var errs []string + retryDelay := kc.RetryDelay + if retryDelay < 1 { + retryDelay = DefaultRetryDelay + } + maxRetryDelay := retryDelay * 10 triesRemaining := 1 + kc.Retries serversToTry := kc.getSortedRoots(locator) @@ -348,6 +368,15 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade return nil, expectLength, url, resp.Header, nil } serversToTry = retryList + if len(serversToTry) > 0 && triesRemaining > 0 { + time.Sleep(retryDelay) + // Increase delay by a random factor between + // 1.75x and 2x + retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay)) + if retryDelay > maxRetryDelay { + retryDelay = maxRetryDelay + } + } } DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs) diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 94591bd064..19a6d4e03b 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -17,6 +17,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "time" @@ -26,8 +27,8 @@ import ( . "gopkg.in/check.v1" ) -// Gocheck boilerplate func Test(t *testing.T) { + DefaultRetryDelay = 50 * time.Millisecond TestingT(t) } @@ -421,17 +422,17 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } type FailThenSucceedHandler struct { + morefails int // fail 1 + this many times before succeeding handled chan string - count int + count atomic.Int64 successhandler http.Handler reqIDs []string } func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id")) - if fh.count == 0 { + if int(fh.count.Add(1)) <= fh.morefails+1 { resp.WriteHeader(500) - fh.count++ fh.handled <- fmt.Sprintf("http://%s", req.Host) } else { fh.successhandler.ServeHTTP(resp, req) @@ -560,14 +561,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - reader, writer := io.Pipe() - - go func() { - writer.Write([]byte("foo")) - writer.Close() - }() - - kc.PutHR(hash, reader, 3) + kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3) shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots() @@ -804,40 +798,64 @@ func (s *StandaloneSuite) TestGetFail(c *C) { } func (s *StandaloneSuite) TestGetFailRetry(c *C) { + defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay) + DefaultRetryDelay = time.Second / 8 + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) - st := &FailThenSucceedHandler{ - handled: make(chan string, 1), - successhandler: StubGetHandler{ - c, - hash, - "abc123", - http.StatusOK, - []byte("foo")}} + for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} { + c.Logf("=== initial delay %v", delay) - ks := RunFakeKeepServer(st) - defer ks.listener.Close() + st := &FailThenSucceedHandler{ + morefails: 2, + handled: make(chan string, 4), + successhandler: StubGetHandler{ + c, + hash, + "abc123", + http.StatusOK, + []byte("foo")}} - arv, err := arvadosclient.MakeArvadosClient() - c.Check(err, IsNil) - kc, _ := MakeKeepClient(arv) - arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + ks := RunFakeKeepServer(st) + defer ks.listener.Close() - r, n, _, err := kc.Get(hash) - c.Assert(err, IsNil) - c.Check(n, Equals, int64(3)) + arv, err := arvadosclient.MakeArvadosClient() + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + kc.Retries = 3 + kc.RetryDelay = delay + kc.DiskCacheSize = DiskCacheDisabled - content, err := ioutil.ReadAll(r) - c.Check(err, IsNil) - c.Check(content, DeepEquals, []byte("foo")) - c.Check(r.Close(), IsNil) + t0 := time.Now() + r, n, _, err := kc.Get(hash) + c.Assert(err, IsNil) + c.Check(n, Equals, int64(3)) + elapsed := time.Since(t0) - c.Logf("%q", st.reqIDs) - c.Assert(len(st.reqIDs) > 1, Equals, true) - for _, reqid := range st.reqIDs { - c.Check(reqid, Not(Equals), "") - c.Check(reqid, Equals, st.reqIDs[0]) + nonsleeptime := time.Second / 10 + expect := kc.RetryDelay + if expect == 0 { + expect = DefaultRetryDelay + } + min := expect + expect*7/4 + expect*7/4*7/4 + max := expect + expect*2 + expect*2*2 + nonsleeptime + c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min)) + c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max)) + + content, err := ioutil.ReadAll(r) + c.Check(err, IsNil) + c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) + + c.Logf("%q", st.reqIDs) + if c.Check(st.reqIDs, Not(HasLen), 0) { + for _, reqid := range st.reqIDs { + c.Check(reqid, Not(Equals), "") + c.Check(reqid, Equals, st.reqIDs[0]) + } + } } } @@ -1484,42 +1502,65 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { } func (s *StandaloneSuite) TestPutBRetry(c *C) { - st := &FailThenSucceedHandler{ - handled: make(chan string, 1), - successhandler: &StubPutHandler{ - c: c, - expectPath: Md5String("foo"), - expectAPIToken: "abc123", - expectBody: "foo", - expectStorageClass: "default", - returnStorageClasses: "", - handled: make(chan string, 5), - }, - } + defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay) + DefaultRetryDelay = time.Second / 8 + + for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} { + c.Logf("=== initial delay %v", delay) + + st := &FailThenSucceedHandler{ + morefails: 5, // handler will fail 6x in total, 3 for each server + handled: make(chan string, 10), + successhandler: &StubPutHandler{ + c: c, + expectPath: Md5String("foo"), + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + }, + } - arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(arv) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Retries = 3 + kc.RetryDelay = delay + kc.DiskCacheSize = DiskCacheDisabled + kc.Want_replicas = 2 - kc.Want_replicas = 2 - arv.ApiToken = "abc123" - localRoots := make(map[string]string) - writableLocalRoots := make(map[string]string) + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) - ks := RunSomeFakeKeepServers(st, 2) + ks := RunSomeFakeKeepServers(st, 2) - for i, k := range ks { - localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - defer k.listener.Close() - } + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } - kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - hash, replicas, err := kc.PutB([]byte("foo")) + t0 := time.Now() + hash, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, IsNil) - c.Check(hash, Equals, "") - c.Check(replicas, Equals, 2) + c.Check(err, IsNil) + c.Check(hash, Equals, "") + c.Check(replicas, Equals, 2) + elapsed := time.Since(t0) + + nonsleeptime := time.Second / 10 + expect := kc.RetryDelay + if expect == 0 { + expect = DefaultRetryDelay + } + min := expect + expect*7/4 + expect*7/4*7/4 + max := expect + expect*2 + expect*2*2 + nonsleeptime + c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min)) + c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max)) + } } func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 6acaf64baa..d5e3d0ec1c 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -13,10 +13,12 @@ import ( "io" "io/ioutil" "log" + "math/rand" "net/http" "os" "strconv" "strings" + "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" @@ -218,6 +220,11 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite replicasPerThread = req.Replicas } + retryDelay := kc.RetryDelay + if retryDelay < 1 { + retryDelay = DefaultRetryDelay + } + maxRetryDelay := retryDelay * 10 retriesRemaining := req.Attempts var retryServers []string @@ -306,14 +313,23 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite } if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 || - (status.statusCode >= 500 && status.statusCode != 503) { + (status.statusCode >= 500 && status.statusCode != http.StatusInsufficientStorage) { // Timeout, too many requests, or other server side failure - // Do not retry when status code is 503, which means the keep server is full + // (do not auto-retry status 507 "full") retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")]) } } sv = retryServers + if len(sv) > 0 { + time.Sleep(retryDelay) + // Increase delay by a random factor between + // 1.75x and 2x + retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay)) + if retryDelay > maxRetryDelay { + retryDelay = maxRetryDelay + } + } } return resp, nil diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index 39ffd45cbe..8afe20f911 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -320,6 +320,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) { kc := h.makeKeepClient(req) kc.DiskCacheSize = keepclient.DiskCacheDisabled + kc.RetryDelay = time.Second var pass bool var tok string diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go index 7efba2348b..473be4d3a6 100644 --- a/services/keepproxy/keepproxy_test.go +++ b/services/keepproxy/keepproxy_test.go @@ -32,8 +32,8 @@ import ( . "gopkg.in/check.v1" ) -// Gocheck boilerplate func Test(t *testing.T) { + keepclient.DefaultRetryDelay = time.Millisecond TestingT(t) } diff --git a/services/keepstore/command.go b/services/keepstore/command.go index 48c8256a3c..7de9fa833e 100644 --- a/services/keepstore/command.go +++ b/services/keepstore/command.go @@ -206,6 +206,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str Arvados: ac, Want_replicas: 1, DiskCacheSize: keepclient.DiskCacheDisabled, + Retries: 0, } h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63()) -- 2.39.5