X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/701150f33acb6c07d35b3802cc02964321d0417c..HEAD:/sdk/go/arvados/client.go diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index c2f6361334..ead66c3d40 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -26,11 +26,13 @@ import ( "regexp" "strconv" "strings" + "sync" "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/hashicorp/go-retryablehttp" + "github.com/sirupsen/logrus" ) // A Client is an HTTP client with an API endpoint and a set of @@ -76,6 +78,14 @@ type Client struct { // context deadline to establish a maximum request time. Timeout time.Duration + // Maximum disk cache size in bytes or percent of total + // filesystem size. If zero, use default, currently 10% of + // filesystem size. + DiskCacheSize ByteSizeOrPercent + + // Where to write debug logs. May be nil. + Logger logrus.FieldLogger + dd *DiscoveryDocument defaultRequestID string @@ -88,7 +98,10 @@ type Client struct { // differs from an outgoing connection limit (a feature // provided by http.Transport) when concurrent calls are // multiplexed on a single http2 connection. - requestLimiter requestLimiter + // + // getRequestLimiter() should always be used, because this can + // be nil. + requestLimiter *requestLimiter last503 atomic.Value } @@ -150,7 +163,8 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) { APIHost: ctrlURL.Host, Insecure: cluster.TLS.Insecure, Timeout: 5 * time.Minute, - requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, + DiskCacheSize: cluster.Collections.WebDAVCache.DiskCacheSize, + requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, }, nil } @@ -238,8 +252,6 @@ var reqIDGen = httpserver.IDGenerator{Prefix: "req-"} var nopCancelFunc context.CancelFunc = func() {} -var reqErrorRe = regexp.MustCompile(`net/http: invalid header `) - // Do augments (*http.Client)Do(): adds Authorization and X-Request-Id // headers, delays in order to comply with rate-limiting restrictions, // and retries failed requests when appropriate. @@ -291,15 +303,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) { checkRetryCalled++ - if c.requestLimiter.Report(resp, respErr) { + if c.getRequestLimiter().Report(resp, respErr) { c.last503.Store(time.Now()) } if c.Timeout == 0 { return false, nil } - if respErr != nil && reqErrorRe.MatchString(respErr.Error()) { - return false, nil - } retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr) if retrying { lastResp, lastRespBody, lastErr = resp, nil, respErr @@ -321,9 +330,10 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } rclient.Logger = nil - c.requestLimiter.Acquire(ctx) + limiter := c.getRequestLimiter() + limiter.Acquire(ctx) if ctx.Err() != nil { - c.requestLimiter.Release() + limiter.Release() cancel() return nil, ctx.Err() } @@ -342,7 +352,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } } if err != nil { - c.requestLimiter.Release() + limiter.Release() cancel() return nil, err } @@ -352,7 +362,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { resp.Body = cancelOnClose{ ReadCloser: resp.Body, cancel: func() { - c.requestLimiter.Release() + limiter.Release() cancel() }, } @@ -366,6 +376,30 @@ func (c *Client) Last503() time.Time { return t } +// globalRequestLimiter entries (one for each APIHost) don't have a +// hard limit on outgoing connections, but do add a delay and reduce +// concurrency after 503 errors. +var ( + globalRequestLimiter = map[string]*requestLimiter{} + globalRequestLimiterLock sync.Mutex +) + +// Get this client's requestLimiter, or a global requestLimiter +// singleton for c's APIHost if this client doesn't have its own. +func (c *Client) getRequestLimiter() *requestLimiter { + if c.requestLimiter != nil { + return c.requestLimiter + } + globalRequestLimiterLock.Lock() + defer globalRequestLimiterLock.Unlock() + limiter := globalRequestLimiter[c.APIHost] + if limiter == nil { + limiter = &requestLimiter{} + globalRequestLimiter[c.APIHost] = limiter + } + return limiter +} + // cancelOnClose calls a provided CancelFunc when its wrapped // ReadCloser's Close() method is called. type cancelOnClose struct {