21023: Add exponential-backoff delay between keepclient retries.
authorTom Clegg <tom@curii.com>
Wed, 14 Feb 2024 19:43:14 +0000 (14:43 -0500)
committerTom Clegg <tom@curii.com>
Wed, 14 Feb 2024 19:43:14 +0000 (14:43 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepstore/command.go

index 2bd7996b59c0260caf1d61560316c3bc42e09357..a455b96fb723ce6bb40b2e001913ab52c42ec47f 100644 (file)
@@ -15,6 +15,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "math/rand"
        "net"
        "net/http"
        "os"
@@ -44,6 +45,8 @@ var (
        DefaultProxyTLSHandshakeTimeout = 10 * time.Second
        DefaultProxyKeepAlive           = 120 * time.Second
 
+       DefaultRetryDelay = 2 * time.Second // see KeepClient.RetryDelay
+
        rootCacheDir = "/var/cache/arvados/keep"
        userCacheDir = ".cache/arvados/keep" // relative to HOME
 )
@@ -105,14 +108,25 @@ const DiskCacheDisabled = arvados.ByteSizeOrPercent(1)
 
 // KeepClient holds information about Arvados and Keep servers.
 type KeepClient struct {
-       Arvados               *arvadosclient.ArvadosClient
-       Want_replicas         int
-       localRoots            map[string]string
-       writableLocalRoots    map[string]string
-       gatewayRoots          map[string]string
-       lock                  sync.RWMutex
-       HTTPClient            HTTPClient
-       Retries               int
+       Arvados            *arvadosclient.ArvadosClient
+       Want_replicas      int
+       localRoots         map[string]string
+       writableLocalRoots map[string]string
+       gatewayRoots       map[string]string
+       lock               sync.RWMutex
+       HTTPClient         HTTPClient
+
+       // Number of times to automatically retry a read/write
+       // operation after a transient failure.
+       Retries int
+
+       // Initial delay after first attempt, used when automatic
+       // retry is invoked. If zero, DefaultRetryDelay is used.
+       // Delays after subsequent attempts are increased by a random
+       // factor between 1.75x and 2x, up to a maximum of 10x the
+       // initial delay.
+       RetryDelay time.Duration
+
        RequestID             string
        StorageClasses        []string
        DefaultStorageClasses []string                  // Set by cluster's exported config
@@ -141,6 +155,7 @@ func (kc *KeepClient) Clone() *KeepClient {
                gatewayRoots:          kc.gatewayRoots,
                HTTPClient:            kc.HTTPClient,
                Retries:               kc.Retries,
+               RetryDelay:            kc.RetryDelay,
                RequestID:             kc.RequestID,
                StorageClasses:        kc.StorageClasses,
                DefaultStorageClasses: kc.DefaultStorageClasses,
@@ -269,6 +284,11 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 
        var errs []string
 
+       retryDelay := kc.RetryDelay
+       if retryDelay < 1 {
+               retryDelay = DefaultRetryDelay
+       }
+       maxRetryDelay := retryDelay * 10
        triesRemaining := 1 + kc.Retries
 
        serversToTry := kc.getSortedRoots(locator)
@@ -348,6 +368,15 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
                        return nil, expectLength, url, resp.Header, nil
                }
                serversToTry = retryList
+               if len(serversToTry) > 0 && triesRemaining > 0 {
+                       time.Sleep(retryDelay)
+                       // Increase delay by a random factor between
+                       // 1.75x and 2x
+                       retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay))
+                       if retryDelay > maxRetryDelay {
+                               retryDelay = maxRetryDelay
+                       }
+               }
        }
        DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
 
index 94591bd0642d175b4b40158f6259fa87f4dee483..19a6d4e03b7103feeb48f33efb9a990457dc3c14 100644 (file)
@@ -17,6 +17,7 @@ import (
        "os"
        "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -26,8 +27,8 @@ import (
        . "gopkg.in/check.v1"
 )
 
-// Gocheck boilerplate
 func Test(t *testing.T) {
+       DefaultRetryDelay = 50 * time.Millisecond
        TestingT(t)
 }
 
@@ -421,17 +422,17 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 }
 
 type FailThenSucceedHandler struct {
+       morefails      int // fail 1 + this many times before succeeding
        handled        chan string
-       count          int
+       count          atomic.Int64
        successhandler http.Handler
        reqIDs         []string
 }
 
 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
-       if fh.count == 0 {
+       if int(fh.count.Add(1)) <= fh.morefails+1 {
                resp.WriteHeader(500)
-               fh.count++
                fh.handled <- fmt.Sprintf("http://%s", req.Host)
        } else {
                fh.successhandler.ServeHTTP(resp, req)
@@ -560,14 +561,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
        kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
 
-       reader, writer := io.Pipe()
-
-       go func() {
-               writer.Write([]byte("foo"))
-               writer.Close()
-       }()
-
-       kc.PutHR(hash, reader, 3)
+       kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
 
        shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
 
@@ -804,40 +798,64 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 }
 
 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+       defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+       DefaultRetryDelay = time.Second / 8
+
        hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
 
-       st := &FailThenSucceedHandler{
-               handled: make(chan string, 1),
-               successhandler: StubGetHandler{
-                       c,
-                       hash,
-                       "abc123",
-                       http.StatusOK,
-                       []byte("foo")}}
+       for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+               c.Logf("=== initial delay %v", delay)
 
-       ks := RunFakeKeepServer(st)
-       defer ks.listener.Close()
+               st := &FailThenSucceedHandler{
+                       morefails: 2,
+                       handled:   make(chan string, 4),
+                       successhandler: StubGetHandler{
+                               c,
+                               hash,
+                               "abc123",
+                               http.StatusOK,
+                               []byte("foo")}}
 
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Check(err, IsNil)
-       kc, _ := MakeKeepClient(arv)
-       arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+               ks := RunFakeKeepServer(st)
+               defer ks.listener.Close()
 
-       r, n, _, err := kc.Get(hash)
-       c.Assert(err, IsNil)
-       c.Check(n, Equals, int64(3))
+               arv, err := arvadosclient.MakeArvadosClient()
+               c.Check(err, IsNil)
+               kc, _ := MakeKeepClient(arv)
+               arv.ApiToken = "abc123"
+               kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+               kc.Retries = 3
+               kc.RetryDelay = delay
+               kc.DiskCacheSize = DiskCacheDisabled
 
-       content, err := ioutil.ReadAll(r)
-       c.Check(err, IsNil)
-       c.Check(content, DeepEquals, []byte("foo"))
-       c.Check(r.Close(), IsNil)
+               t0 := time.Now()
+               r, n, _, err := kc.Get(hash)
+               c.Assert(err, IsNil)
+               c.Check(n, Equals, int64(3))
+               elapsed := time.Since(t0)
 
-       c.Logf("%q", st.reqIDs)
-       c.Assert(len(st.reqIDs) > 1, Equals, true)
-       for _, reqid := range st.reqIDs {
-               c.Check(reqid, Not(Equals), "")
-               c.Check(reqid, Equals, st.reqIDs[0])
+               nonsleeptime := time.Second / 10
+               expect := kc.RetryDelay
+               if expect == 0 {
+                       expect = DefaultRetryDelay
+               }
+               min := expect + expect*7/4 + expect*7/4*7/4
+               max := expect + expect*2 + expect*2*2 + nonsleeptime
+               c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+               c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+
+               content, err := ioutil.ReadAll(r)
+               c.Check(err, IsNil)
+               c.Check(content, DeepEquals, []byte("foo"))
+               c.Check(r.Close(), IsNil)
+
+               c.Logf("%q", st.reqIDs)
+               if c.Check(st.reqIDs, Not(HasLen), 0) {
+                       for _, reqid := range st.reqIDs {
+                               c.Check(reqid, Not(Equals), "")
+                               c.Check(reqid, Equals, st.reqIDs[0])
+                       }
+               }
        }
 }
 
@@ -1484,42 +1502,65 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
 }
 
 func (s *StandaloneSuite) TestPutBRetry(c *C) {
-       st := &FailThenSucceedHandler{
-               handled: make(chan string, 1),
-               successhandler: &StubPutHandler{
-                       c:                    c,
-                       expectPath:           Md5String("foo"),
-                       expectAPIToken:       "abc123",
-                       expectBody:           "foo",
-                       expectStorageClass:   "default",
-                       returnStorageClasses: "",
-                       handled:              make(chan string, 5),
-               },
-       }
+       defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+       DefaultRetryDelay = time.Second / 8
+
+       for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+               c.Logf("=== initial delay %v", delay)
+
+               st := &FailThenSucceedHandler{
+                       morefails: 5, // handler will fail 6x in total, 3 for each server
+                       handled:   make(chan string, 10),
+                       successhandler: &StubPutHandler{
+                               c:                    c,
+                               expectPath:           Md5String("foo"),
+                               expectAPIToken:       "abc123",
+                               expectBody:           "foo",
+                               expectStorageClass:   "default",
+                               returnStorageClasses: "",
+                               handled:              make(chan string, 5),
+                       },
+               }
 
-       arv, _ := arvadosclient.MakeArvadosClient()
-       kc, _ := MakeKeepClient(arv)
+               arv, _ := arvadosclient.MakeArvadosClient()
+               kc, _ := MakeKeepClient(arv)
+               kc.Retries = 3
+               kc.RetryDelay = delay
+               kc.DiskCacheSize = DiskCacheDisabled
+               kc.Want_replicas = 2
 
-       kc.Want_replicas = 2
-       arv.ApiToken = "abc123"
-       localRoots := make(map[string]string)
-       writableLocalRoots := make(map[string]string)
+               arv.ApiToken = "abc123"
+               localRoots := make(map[string]string)
+               writableLocalRoots := make(map[string]string)
 
-       ks := RunSomeFakeKeepServers(st, 2)
+               ks := RunSomeFakeKeepServers(st, 2)
 
-       for i, k := range ks {
-               localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
-               writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
-               defer k.listener.Close()
-       }
+               for i, k := range ks {
+                       localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+                       writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+                       defer k.listener.Close()
+               }
 
-       kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+               kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
 
-       hash, replicas, err := kc.PutB([]byte("foo"))
+               t0 := time.Now()
+               hash, replicas, err := kc.PutB([]byte("foo"))
 
-       c.Check(err, IsNil)
-       c.Check(hash, Equals, "")
-       c.Check(replicas, Equals, 2)
+               c.Check(err, IsNil)
+               c.Check(hash, Equals, "")
+               c.Check(replicas, Equals, 2)
+               elapsed := time.Since(t0)
+
+               nonsleeptime := time.Second / 10
+               expect := kc.RetryDelay
+               if expect == 0 {
+                       expect = DefaultRetryDelay
+               }
+               min := expect + expect*7/4 + expect*7/4*7/4
+               max := expect + expect*2 + expect*2*2 + nonsleeptime
+               c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+               c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+       }
 }
 
 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
index 6acaf64baa34caa265eb10bafcbf36a70d308d9b..d5e3d0ec1c4d3260cce66203e2b58bfdb374252f 100644 (file)
@@ -13,10 +13,12 @@ import (
        "io"
        "io/ioutil"
        "log"
+       "math/rand"
        "net/http"
        "os"
        "strconv"
        "strings"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -218,6 +220,11 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
                replicasPerThread = req.Replicas
        }
 
+       retryDelay := kc.RetryDelay
+       if retryDelay < 1 {
+               retryDelay = DefaultRetryDelay
+       }
+       maxRetryDelay := retryDelay * 10
        retriesRemaining := req.Attempts
        var retryServers []string
 
@@ -306,14 +313,23 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
                        }
 
                        if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
-                               (status.statusCode >= 500 && status.statusCode != 503) {
+                               (status.statusCode >= 500 && status.statusCode != http.StatusInsufficientStorage) {
                                // Timeout, too many requests, or other server side failure
-                               // Do not retry when status code is 503, which means the keep server is full
+                               // (do not auto-retry status 507 "full")
                                retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                        }
                }
 
                sv = retryServers
+               if len(sv) > 0 {
+                       time.Sleep(retryDelay)
+                       // Increase delay by a random factor between
+                       // 1.75x and 2x
+                       retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay))
+                       if retryDelay > maxRetryDelay {
+                               retryDelay = maxRetryDelay
+                       }
+               }
        }
 
        return resp, nil
index 39ffd45cbe37b69f663dc6093acd4dfe221c74a1..8afe20f9115950b66a5cff42b1d8c468eb936260 100644 (file)
@@ -320,6 +320,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
 
        kc := h.makeKeepClient(req)
        kc.DiskCacheSize = keepclient.DiskCacheDisabled
+       kc.RetryDelay = time.Second
 
        var pass bool
        var tok string
index 7efba2348b6593a1232edb6c719253ef388674dd..473be4d3a6581302748265e5f654511804ea6c25 100644 (file)
@@ -32,8 +32,8 @@ import (
        . "gopkg.in/check.v1"
 )
 
-// Gocheck boilerplate
 func Test(t *testing.T) {
+       keepclient.DefaultRetryDelay = time.Millisecond
        TestingT(t)
 }
 
index 48c8256a3ca1e22e524b60fe89ebc7de26be54e7..7de9fa833ebf5ad86c71406fa077852ce088f6fa 100644 (file)
@@ -206,6 +206,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
                Arvados:       ac,
                Want_replicas: 1,
                DiskCacheSize: keepclient.DiskCacheDisabled,
+               Retries:       0,
        }
        h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63())