+
+ if req.Header.Get("X-Request-Id") == "" {
+ var reqid string
+ if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ reqid = ctxreqid
+ } else if c.defaultRequestID != "" {
+ reqid = c.defaultRequestID
+ } else {
+ reqid = reqIDGen.Next()
+ }
+ if req.Header == nil {
+ req.Header = http.Header{"X-Request-Id": {reqid}}
+ } else {
+ req.Header.Set("X-Request-Id", reqid)
+ }
+ }
+
+ rreq, err := retryablehttp.FromRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ cancel := nopCancelFunc
+ var lastResp *http.Response
+ var lastRespBody io.ReadCloser
+ var lastErr error
+
+ rclient := retryablehttp.NewClient()
+ rclient.HTTPClient = c.httpClient()
+ rclient.Backoff = exponentialBackoff
+ if c.Timeout > 0 {
+ rclient.RetryWaitMax = c.Timeout / 10
+ rclient.RetryMax = 32
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+ rreq = rreq.WithContext(ctx)
+ } else {
+ rclient.RetryMax = 0
+ }
+ rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
+ if c.getRequestLimiter().Report(resp, respErr) {
+ c.last503.Store(time.Now())
+ }
+ if c.Timeout == 0 {
+ return false, err
+ }
+ retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
+ if retrying {
+ lastResp, lastRespBody, lastErr = resp, nil, respErr
+ if respErr == nil {
+ // Save the response and body so we
+ // can return it instead of "deadline
+ // exceeded". retryablehttp.Client
+ // will drain and discard resp.body,
+ // so we need to stash it separately.
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err == nil {
+ lastRespBody = io.NopCloser(bytes.NewReader(buf))
+ } else {
+ lastResp, lastErr = nil, err
+ }
+ }
+ }
+ return retrying, err
+ }
+ rclient.Logger = nil
+
+ limiter := c.getRequestLimiter()
+ limiter.Acquire(ctx)
+ if ctx.Err() != nil {
+ limiter.Release()
+ cancel()
+ return nil, ctx.Err()
+ }
+ resp, err := rclient.Do(rreq)
+ if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) {
+ resp, err = lastResp, lastErr
+ if resp != nil {
+ resp.Body = lastRespBody
+ }
+ }
+ if err != nil {
+ limiter.Release()
+ cancel()
+ return nil, err
+ }
+ // We need to call cancel() eventually, but we can't use
+ // "defer cancel()" because the context has to stay alive
+ // until the caller has finished reading the response body.
+ resp.Body = cancelOnClose{
+ ReadCloser: resp.Body,
+ cancel: func() {
+ limiter.Release()
+ cancel()
+ },
+ }
+ return resp, err
+}
+
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+ t, _ := c.last503.Load().(time.Time)
+ return t
+}
+
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+ globalRequestLimiter = map[string]*requestLimiter{}
+ globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+ if c.requestLimiter != nil {
+ return c.requestLimiter
+ }
+ globalRequestLimiterLock.Lock()
+ defer globalRequestLimiterLock.Unlock()
+ limiter := globalRequestLimiter[c.APIHost]
+ if limiter == nil {
+ limiter = &requestLimiter{}
+ globalRequestLimiter[c.APIHost] = limiter
+ }
+ return limiter
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+ io.ReadCloser
+ cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+ err := coc.ReadCloser.Close()
+ coc.cancel()
+ return err
+}
+
+func isRedirectStatus(code int) bool {
+ switch code {
+ case http.StatusMovedPermanently, http.StatusFound, http.StatusSeeOther, http.StatusTemporaryRedirect, http.StatusPermanentRedirect:
+ return true
+ default:
+ return false
+ }
+}
+
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+ if attemptNum > 0 && min < minExponentialBackoffBase {
+ min = minExponentialBackoffBase
+ }
+ var t time.Duration
+ if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+ if s := resp.Header.Get("Retry-After"); s != "" {
+ if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+ t = time.Second * time.Duration(sleep)
+ } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+ t = stamp.Sub(time.Now())
+ }
+ }
+ }
+ if t == 0 {
+ jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+ t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+ }
+ if t < min {
+ return min
+ } else if t > max {
+ return max
+ } else {
+ return t
+ }