X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f3b5289610db5f0b54c73ff5ec0cd80e51f1ae94..HEAD:/sdk/go/arvados/client.go diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 6316d1beda..ead66c3d40 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -16,18 +16,23 @@ import ( "io/fs" "io/ioutil" "log" + "math" "math/big" + mathrand "math/rand" "net" "net/http" "net/url" "os" "regexp" + "strconv" "strings" + "sync" "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/hashicorp/go-retryablehttp" + "github.com/sirupsen/logrus" ) // A Client is an HTTP client with an API endpoint and a set of @@ -73,6 +78,14 @@ type Client struct { // context deadline to establish a maximum request time. Timeout time.Duration + // Maximum disk cache size in bytes or percent of total + // filesystem size. If zero, use default, currently 10% of + // filesystem size. + DiskCacheSize ByteSizeOrPercent + + // Where to write debug logs. May be nil. + Logger logrus.FieldLogger + dd *DiscoveryDocument defaultRequestID string @@ -85,7 +98,10 @@ type Client struct { // differs from an outgoing connection limit (a feature // provided by http.Transport) when concurrent calls are // multiplexed on a single http2 connection. - requestLimiter requestLimiter + // + // getRequestLimiter() should always be used, because this can + // be nil. + requestLimiter *requestLimiter last503 atomic.Value } @@ -147,7 +163,8 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) { APIHost: ctrlURL.Host, Insecure: cluster.TLS.Insecure, Timeout: 5 * time.Minute, - requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, + DiskCacheSize: cluster.Collections.WebDAVCache.DiskCacheSize, + requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, }, nil } @@ -271,9 +288,11 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { var lastResp *http.Response var lastRespBody io.ReadCloser var lastErr error + var checkRetryCalled int rclient := retryablehttp.NewClient() rclient.HTTPClient = c.httpClient() + rclient.Backoff = exponentialBackoff if c.Timeout > 0 { rclient.RetryWaitMax = c.Timeout / 10 rclient.RetryMax = 32 @@ -283,11 +302,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { rclient.RetryMax = 0 } rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) { - if c.requestLimiter.Report(resp, respErr) { + checkRetryCalled++ + if c.getRequestLimiter().Report(resp, respErr) { c.last503.Store(time.Now()) } if c.Timeout == 0 { - return false, err + return false, nil } retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr) if retrying { @@ -310,21 +330,29 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } rclient.Logger = nil - c.requestLimiter.Acquire(ctx) + limiter := c.getRequestLimiter() + limiter.Acquire(ctx) if ctx.Err() != nil { - c.requestLimiter.Release() + limiter.Release() cancel() return nil, ctx.Err() } resp, err := rclient.Do(rreq) if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) { - resp, err = lastResp, lastErr + resp = lastResp + err = lastErr + if checkRetryCalled > 0 && err != nil { + // Mimic retryablehttp's "giving up after X + // attempts" message, even if we gave up + // because of time rather than maxretries. + err = fmt.Errorf("%s %s giving up after %d attempt(s): %w", req.Method, req.URL.String(), checkRetryCalled, err) + } if resp != nil { resp.Body = lastRespBody } } if err != nil { - c.requestLimiter.Release() + limiter.Release() cancel() return nil, err } @@ -334,7 +362,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { resp.Body = cancelOnClose{ ReadCloser: resp.Body, cancel: func() { - c.requestLimiter.Release() + limiter.Release() cancel() }, } @@ -348,6 +376,30 @@ func (c *Client) Last503() time.Time { return t } +// globalRequestLimiter entries (one for each APIHost) don't have a +// hard limit on outgoing connections, but do add a delay and reduce +// concurrency after 503 errors. +var ( + globalRequestLimiter = map[string]*requestLimiter{} + globalRequestLimiterLock sync.Mutex +) + +// Get this client's requestLimiter, or a global requestLimiter +// singleton for c's APIHost if this client doesn't have its own. +func (c *Client) getRequestLimiter() *requestLimiter { + if c.requestLimiter != nil { + return c.requestLimiter + } + globalRequestLimiterLock.Lock() + defer globalRequestLimiterLock.Unlock() + limiter := globalRequestLimiter[c.APIHost] + if limiter == nil { + limiter = &requestLimiter{} + globalRequestLimiter[c.APIHost] = limiter + } + return limiter +} + // cancelOnClose calls a provided CancelFunc when its wrapped // ReadCloser's Close() method is called. type cancelOnClose struct { @@ -370,6 +422,40 @@ func isRedirectStatus(code int) bool { } } +const minExponentialBackoffBase = time.Second + +// Implements retryablehttp.Backoff using the server-provided +// Retry-After header if available, otherwise nearly-full jitter +// exponential backoff (similar to +// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/), +// in all cases respecting the provided min and max. +func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { + if attemptNum > 0 && min < minExponentialBackoffBase { + min = minExponentialBackoffBase + } + var t time.Duration + if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) { + if s := resp.Header.Get("Retry-After"); s != "" { + if sleep, err := strconv.ParseInt(s, 10, 64); err == nil { + t = time.Second * time.Duration(sleep) + } else if stamp, err := time.Parse(time.RFC1123, s); err == nil { + t = stamp.Sub(time.Now()) + } + } + } + if t == 0 { + jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64() + t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter) + } + if t < min { + return min + } else if t > max { + return max + } else { + return t + } +} + // DoAndDecode performs req and unmarshals the response (which must be // JSON) into dst. Use this instead of RequestAndDecode if you need // more control of the http.Request object. @@ -502,6 +588,12 @@ func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, m if err != nil { return err } + if dst == nil { + if urlValues == nil { + urlValues = url.Values{} + } + urlValues["select"] = []string{`["uuid"]`} + } if urlValues == nil { // Nothing to send } else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) { @@ -575,7 +667,11 @@ func (c *Client) apiURL(path string) string { if scheme == "" { scheme = "https" } - return scheme + "://" + c.APIHost + "/" + path + // Double-slash in URLs tend to cause subtle hidden problems + // (e.g., they can behave differently when a load balancer is + // in the picture). Here we ensure exactly one "/" regardless + // of whether the given APIHost or path has a superfluous one. + return scheme + "://" + strings.TrimSuffix(c.APIHost, "/") + "/" + strings.TrimPrefix(path, "/") } // DiscoveryDocument is the Arvados server's description of itself. @@ -586,6 +682,7 @@ type DiscoveryDocument struct { GitURL string `json:"gitUrl"` Schemas map[string]Schema `json:"schemas"` Resources map[string]Resource `json:"resources"` + Revision string `json:"revision"` } type Resource struct {