"regexp"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/hashicorp/go-retryablehttp"
+ "github.com/sirupsen/logrus"
)
// A Client is an HTTP client with an API endpoint and a set of
// context deadline to establish a maximum request time.
Timeout time.Duration
+ // Maximum disk cache size in bytes or percent of total
+ // filesystem size. If zero, use default, currently 10% of
+ // filesystem size.
+ DiskCacheSize ByteSizeOrPercent
+
+ // Where to write debug logs. May be nil.
+ Logger logrus.FieldLogger
+
dd *DiscoveryDocument
defaultRequestID string
// differs from an outgoing connection limit (a feature
// provided by http.Transport) when concurrent calls are
// multiplexed on a single http2 connection.
- requestLimiter requestLimiter
+ //
+ // getRequestLimiter() should always be used, because this can
+ // be nil.
+ requestLimiter *requestLimiter
last503 atomic.Value
}
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
Timeout: 5 * time.Minute,
- requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
+ DiskCacheSize: cluster.Collections.WebDAVCache.DiskCacheSize,
+ requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
}, nil
}
var nopCancelFunc context.CancelFunc = func() {}
+var reqErrorRe = regexp.MustCompile(`net/http: invalid header `)
+
// Do augments (*http.Client)Do(): adds Authorization and X-Request-Id
// headers, delays in order to comply with rate-limiting restrictions,
// and retries failed requests when appropriate.
var lastResp *http.Response
var lastRespBody io.ReadCloser
var lastErr error
+ var checkRetryCalled int
rclient := retryablehttp.NewClient()
rclient.HTTPClient = c.httpClient()
rclient.RetryMax = 0
}
rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
- if c.requestLimiter.Report(resp, respErr) {
+ checkRetryCalled++
+ if c.getRequestLimiter().Report(resp, respErr) {
c.last503.Store(time.Now())
}
if c.Timeout == 0 {
- return false, err
+ return false, nil
+ }
+ // This check can be removed when
+ // https://github.com/hashicorp/go-retryablehttp/pull/210
+ // (or equivalent) is merged and we update go.mod.
+ // Until then, it is needed to pass
+ // TestNonRetryableStdlibError.
+ if respErr != nil && reqErrorRe.MatchString(respErr.Error()) {
+ return false, nil
}
retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
if retrying {
}
rclient.Logger = nil
- c.requestLimiter.Acquire(ctx)
+ limiter := c.getRequestLimiter()
+ limiter.Acquire(ctx)
if ctx.Err() != nil {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
return nil, ctx.Err()
}
resp, err := rclient.Do(rreq)
if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) {
- resp, err = lastResp, lastErr
+ resp = lastResp
+ err = lastErr
+ if checkRetryCalled > 0 && err != nil {
+ // Mimic retryablehttp's "giving up after X
+ // attempts" message, even if we gave up
+ // because of time rather than maxretries.
+ err = fmt.Errorf("%s %s giving up after %d attempt(s): %w", req.Method, req.URL.String(), checkRetryCalled, err)
+ }
if resp != nil {
resp.Body = lastRespBody
}
}
if err != nil {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
return nil, err
}
resp.Body = cancelOnClose{
ReadCloser: resp.Body,
cancel: func() {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
},
}
return t
}
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+ globalRequestLimiter = map[string]*requestLimiter{}
+ globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+ if c.requestLimiter != nil {
+ return c.requestLimiter
+ }
+ globalRequestLimiterLock.Lock()
+ defer globalRequestLimiterLock.Unlock()
+ limiter := globalRequestLimiter[c.APIHost]
+ if limiter == nil {
+ limiter = &requestLimiter{}
+ globalRequestLimiter[c.APIHost] = limiter
+ }
+ return limiter
+}
+
// cancelOnClose calls a provided CancelFunc when its wrapped
// ReadCloser's Close() method is called.
type cancelOnClose struct {
if scheme == "" {
scheme = "https"
}
- return scheme + "://" + c.APIHost + "/" + path
+ // Double-slash in URLs tend to cause subtle hidden problems
+ // (e.g., they can behave differently when a load balancer is
+ // in the picture). Here we ensure exactly one "/" regardless
+ // of whether the given APIHost or path has a superfluous one.
+ return scheme + "://" + strings.TrimSuffix(c.APIHost, "/") + "/" + strings.TrimPrefix(path, "/")
}
// DiscoveryDocument is the Arvados server's description of itself.