X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/936ed3a6a7484917fc10636b3dc2c5fdd9578643..79036634292cc4ed13db98a834f33d617c85b1cd:/sdk/go/arvados/client.go diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 735a44d24c..172763fe80 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -88,7 +88,10 @@ type Client struct { // differs from an outgoing connection limit (a feature // provided by http.Transport) when concurrent calls are // multiplexed on a single http2 connection. - requestLimiter requestLimiter + // + // getRequestLimiter() should always be used, because this can + // be nil. + requestLimiter *requestLimiter last503 atomic.Value } @@ -150,7 +153,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) { APIHost: ctrlURL.Host, Insecure: cluster.TLS.Insecure, Timeout: 5 * time.Minute, - requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, + requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, }, nil } @@ -287,7 +290,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { rclient.RetryMax = 0 } rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) { - if c.requestLimiter.Report(resp, respErr) { + if c.getRequestLimiter().Report(resp, respErr) { c.last503.Store(time.Now()) } if c.Timeout == 0 { @@ -314,9 +317,10 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } rclient.Logger = nil - c.requestLimiter.Acquire(ctx) + limiter := c.getRequestLimiter() + limiter.Acquire(ctx) if ctx.Err() != nil { - c.requestLimiter.Release() + limiter.Release() cancel() return nil, ctx.Err() } @@ -328,7 +332,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } } if err != nil { - c.requestLimiter.Release() + limiter.Release() cancel() return nil, err } @@ -338,7 +342,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { resp.Body = cancelOnClose{ ReadCloser: resp.Body, cancel: func() { - c.requestLimiter.Release() + limiter.Release() cancel() }, } @@ -352,6 +356,19 @@ func (c *Client) Last503() time.Time { return t } +// globalRequestLimiter doesn't have a maximum number of outgoing +// connections, but is just used to backoff after 503 errors. +var globalRequestLimiter requestLimiter + +// Get this client's requestLimiter, or the global requestLimiter +// singleton if the client doesn't have its own. +func (c *Client) getRequestLimiter() *requestLimiter { + if c.requestLimiter != nil { + return c.requestLimiter + } + return &globalRequestLimiter +} + // cancelOnClose calls a provided CancelFunc when its wrapped // ReadCloser's Close() method is called. type cancelOnClose struct {