X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6e63d6052aa6cf3559edad9666b574939bcd730d..aab9cf1b5d30c5e49152bff09dc1ee18d38973dc:/sdk/go/arvados/client.go diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 5a498b01f0..d71ade8a81 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -16,16 +16,21 @@ import ( "io/fs" "io/ioutil" "log" + "math" "math/big" + mathrand "math/rand" "net" "net/http" "net/url" "os" "regexp" + "strconv" "strings" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/httpserver" + "github.com/hashicorp/go-retryablehttp" ) // A Client is an HTTP client with an API endpoint and a set of @@ -64,9 +69,11 @@ type Client struct { // Timeout for requests. NewClientFromConfig and // NewClientFromEnv return a Client with a default 5 minute - // timeout. To disable this timeout and rely on each - // http.Request's context deadline instead, set Timeout to - // zero. + // timeout. Within this time, retryable errors are + // automatically retried with exponential backoff. + // + // To disable automatic retries, set Timeout to zero and use a + // context deadline to establish a maximum request time. Timeout time.Duration dd *DiscoveryDocument @@ -76,6 +83,14 @@ type Client struct { // APIHost and AuthToken were loaded from ARVADOS_* env vars // (used to customize "no host/token" error messages) loadedFromEnv bool + + // Track/limit concurrent outgoing API calls. Note this + // differs from an outgoing connection limit (a feature + // provided by http.Transport) when concurrent calls are + // multiplexed on a single http2 connection. + requestLimiter requestLimiter + + last503 atomic.Value } // InsecureHTTPClient is the default http.Client used by a Client with @@ -130,11 +145,12 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) { } } return &Client{ - Client: hc, - Scheme: ctrlURL.Scheme, - APIHost: ctrlURL.Host, - Insecure: cluster.TLS.Insecure, - Timeout: 5 * time.Minute, + Client: hc, + Scheme: ctrlURL.Scheme, + APIHost: ctrlURL.Host, + Insecure: cluster.TLS.Insecure, + Timeout: 5 * time.Minute, + requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)}, }, nil } @@ -220,10 +236,14 @@ func NewClientFromEnv() *Client { var reqIDGen = httpserver.IDGenerator{Prefix: "req-"} -// Do adds Authorization and X-Request-Id headers and then calls -// (*http.Client)Do(). +var nopCancelFunc context.CancelFunc = func() {} + +// Do augments (*http.Client)Do(): adds Authorization and X-Request-Id +// headers, delays in order to comply with rate-limiting restrictions, +// and retries failed requests when appropriate. func (c *Client) Do(req *http.Request) (*http.Response, error) { - if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" { + ctx := req.Context() + if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" { req.Header.Add("Authorization", auth) } else if c.AuthToken != "" { req.Header.Add("Authorization", "OAuth2 "+c.AuthToken) @@ -231,7 +251,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { if req.Header.Get("X-Request-Id") == "" { var reqid string - if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" { + if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" { reqid = ctxreqid } else if c.defaultRequestID != "" { reqid = c.defaultRequestID @@ -244,25 +264,94 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { req.Header.Set("X-Request-Id", reqid) } } - var cancel context.CancelFunc + + rreq, err := retryablehttp.FromRequest(req) + if err != nil { + return nil, err + } + + cancel := nopCancelFunc + var lastResp *http.Response + var lastRespBody io.ReadCloser + var lastErr error + + rclient := retryablehttp.NewClient() + rclient.HTTPClient = c.httpClient() + rclient.Backoff = exponentialBackoff if c.Timeout > 0 { - ctx := req.Context() + rclient.RetryWaitMax = c.Timeout / 10 + rclient.RetryMax = 32 ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout)) - req = req.WithContext(ctx) - } - resp, err := c.httpClient().Do(req) - if err == nil && cancel != nil { - // We need to call cancel() eventually, but we can't - // use "defer cancel()" because the context has to - // stay alive until the caller has finished reading - // the response body. - resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel} - } else if cancel != nil { + rreq = rreq.WithContext(ctx) + } else { + rclient.RetryMax = 0 + } + rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) { + if c.requestLimiter.Report(resp, respErr) { + c.last503.Store(time.Now()) + } + if c.Timeout == 0 { + return false, err + } + retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr) + if retrying { + lastResp, lastRespBody, lastErr = resp, nil, respErr + if respErr == nil { + // Save the response and body so we + // can return it instead of "deadline + // exceeded". retryablehttp.Client + // will drain and discard resp.body, + // so we need to stash it separately. + buf, err := ioutil.ReadAll(resp.Body) + if err == nil { + lastRespBody = io.NopCloser(bytes.NewReader(buf)) + } else { + lastResp, lastErr = nil, err + } + } + } + return retrying, err + } + rclient.Logger = nil + + c.requestLimiter.Acquire(ctx) + if ctx.Err() != nil { + c.requestLimiter.Release() + cancel() + return nil, ctx.Err() + } + resp, err := rclient.Do(rreq) + if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) { + resp, err = lastResp, lastErr + if resp != nil { + resp.Body = lastRespBody + } + } + if err != nil { + c.requestLimiter.Release() cancel() + return nil, err + } + // We need to call cancel() eventually, but we can't use + // "defer cancel()" because the context has to stay alive + // until the caller has finished reading the response body. + resp.Body = cancelOnClose{ + ReadCloser: resp.Body, + cancel: func() { + c.requestLimiter.Release() + cancel() + }, } return resp, err } +// Last503 returns the time of the most recent HTTP 503 (Service +// Unavailable) response. Zero time indicates never. +func (c *Client) Last503() time.Time { + t, _ := c.last503.Load().(time.Time) + return t +} + // cancelOnClose calls a provided CancelFunc when its wrapped // ReadCloser's Close() method is called. type cancelOnClose struct { @@ -285,6 +374,40 @@ func isRedirectStatus(code int) bool { } } +const minExponentialBackoffBase = time.Second + +// Implements retryablehttp.Backoff using the server-provided +// Retry-After header if available, otherwise nearly-full jitter +// exponential backoff (similar to +// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/), +// in all cases respecting the provided min and max. +func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { + if attemptNum > 0 && min < minExponentialBackoffBase { + min = minExponentialBackoffBase + } + var t time.Duration + if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) { + if s := resp.Header.Get("Retry-After"); s != "" { + if sleep, err := strconv.ParseInt(s, 10, 64); err == nil { + t = time.Second * time.Duration(sleep) + } else if stamp, err := time.Parse(time.RFC1123, s); err == nil { + t = stamp.Sub(time.Now()) + } + } + } + if t == 0 { + jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64() + t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter) + } + if t < min { + return min + } else if t > max { + return max + } else { + return t + } +} + // DoAndDecode performs req and unmarshals the response (which must be // JSON) into dst. Use this instead of RequestAndDecode if you need // more control of the http.Request object. @@ -417,6 +540,12 @@ func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, m if err != nil { return err } + if dst == nil { + if urlValues == nil { + urlValues = url.Values{} + } + urlValues["select"] = []string{`["uuid"]`} + } if urlValues == nil { // Nothing to send } else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) {