"io/fs"
"io/ioutil"
"log"
+ "math"
"math/big"
+ mathrand "math/rand"
"net"
"net/http"
"net/url"
"os"
"regexp"
+ "strconv"
"strings"
"sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/go-retryablehttp"
)
// A Client is an HTTP client with an API endpoint and a set of
// Timeout for requests. NewClientFromConfig and
// NewClientFromEnv return a Client with a default 5 minute
- // timeout. To disable this timeout and rely on each
- // http.Request's context deadline instead, set Timeout to
- // zero.
+ // timeout. Within this time, retryable errors are
+ // automatically retried with exponential backoff.
+ //
+ // To disable automatic retries, set Timeout to zero and use a
+ // context deadline to establish a maximum request time.
Timeout time.Duration
dd *DiscoveryDocument
}
}
return &Client{
- Client: hc,
- Scheme: ctrlURL.Scheme,
- APIHost: ctrlURL.Host,
- Insecure: cluster.TLS.Insecure,
- Timeout: 5 * time.Minute,
+ Client: hc,
+ Scheme: ctrlURL.Scheme,
+ APIHost: ctrlURL.Host,
+ Insecure: cluster.TLS.Insecure,
+ Timeout: 5 * time.Minute,
+ requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
}, nil
}
var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
-// Do adds Authorization and X-Request-Id headers, delays in order to
-// comply with rate-limiting restrictions, and then calls
-// (*http.Client)Do().
+var nopCancelFunc context.CancelFunc = func() {}
+
+// Do augments (*http.Client)Do(): adds Authorization and X-Request-Id
+// headers, delays in order to comply with rate-limiting restrictions,
+// and retries failed requests when appropriate.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
req.Header.Set("X-Request-Id", reqid)
}
}
- var cancel context.CancelFunc
+
+ rreq, err := retryablehttp.FromRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ cancel := nopCancelFunc
+ var lastResp *http.Response
+ var lastRespBody io.ReadCloser
+ var lastErr error
+
+ rclient := retryablehttp.NewClient()
+ rclient.HTTPClient = c.httpClient()
+ rclient.Backoff = exponentialBackoff
if c.Timeout > 0 {
+ rclient.RetryWaitMax = c.Timeout / 10
+ rclient.RetryMax = 32
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
- req = req.WithContext(ctx)
+ rreq = rreq.WithContext(ctx)
} else {
- cancel = context.CancelFunc(func() {})
+ rclient.RetryMax = 0
}
+ rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
+ if c.requestLimiter.Report(resp, respErr) {
+ c.last503.Store(time.Now())
+ }
+ if c.Timeout == 0 {
+ return false, err
+ }
+ retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
+ if retrying {
+ lastResp, lastRespBody, lastErr = resp, nil, respErr
+ if respErr == nil {
+ // Save the response and body so we
+ // can return it instead of "deadline
+ // exceeded". retryablehttp.Client
+ // will drain and discard resp.body,
+ // so we need to stash it separately.
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err == nil {
+ lastRespBody = io.NopCloser(bytes.NewReader(buf))
+ } else {
+ lastResp, lastErr = nil, err
+ }
+ }
+ }
+ return retrying, err
+ }
+ rclient.Logger = nil
c.requestLimiter.Acquire(ctx)
if ctx.Err() != nil {
c.requestLimiter.Release()
+ cancel()
return nil, ctx.Err()
}
-
- // Attach Release() to cancel func, see cancelOnClose below.
- cancelOrig := cancel
- cancel = func() {
- c.requestLimiter.Release()
- cancelOrig()
- }
-
- resp, err := c.httpClient().Do(req)
- if c.requestLimiter.Report(resp, err) {
- c.last503.Store(time.Now())
+ resp, err := rclient.Do(rreq)
+ if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) {
+ resp, err = lastResp, lastErr
+ if resp != nil {
+ resp.Body = lastRespBody
+ }
}
- if err == nil {
- // We need to call cancel() eventually, but we can't
- // use "defer cancel()" because the context has to
- // stay alive until the caller has finished reading
- // the response body.
- resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
- } else {
+ if err != nil {
+ c.requestLimiter.Release()
cancel()
+ return nil, err
+ }
+ // We need to call cancel() eventually, but we can't use
+ // "defer cancel()" because the context has to stay alive
+ // until the caller has finished reading the response body.
+ resp.Body = cancelOnClose{
+ ReadCloser: resp.Body,
+ cancel: func() {
+ c.requestLimiter.Release()
+ cancel()
+ },
}
return resp, err
}
}
}
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+ if attemptNum > 0 && min < minExponentialBackoffBase {
+ min = minExponentialBackoffBase
+ }
+ var t time.Duration
+ if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+ if s := resp.Header.Get("Retry-After"); s != "" {
+ if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+ t = time.Second * time.Duration(sleep)
+ } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+ t = stamp.Sub(time.Now())
+ }
+ }
+ }
+ if t == 0 {
+ jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+ t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+ }
+ if t < min {
+ return min
+ } else if t > max {
+ return max
+ } else {
+ return t
+ }
+}
+
// DoAndDecode performs req and unmarshals the response (which must be
// JSON) into dst. Use this instead of RequestAndDecode if you need
// more control of the http.Request object.
if err != nil {
return err
}
+ if dst == nil {
+ if urlValues == nil {
+ urlValues = url.Values{}
+ }
+ urlValues["select"] = []string{`["uuid"]`}
+ }
if urlValues == nil {
// Nothing to send
} else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) {