Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / go / arvados / client.go
index 6316d1bedaceb35bc7d3f5578aff3425078aabb7..09185d1d6be2f7c8dc1136603b9ce11dc0f304e2 100644 (file)
@@ -16,18 +16,23 @@ import (
        "io/fs"
        "io/ioutil"
        "log"
+       "math"
        "math/big"
+       mathrand "math/rand"
        "net"
        "net/http"
        "net/url"
        "os"
        "regexp"
+       "strconv"
        "strings"
+       "sync"
        "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/hashicorp/go-retryablehttp"
+       "github.com/sirupsen/logrus"
 )
 
 // A Client is an HTTP client with an API endpoint and a set of
@@ -73,6 +78,14 @@ type Client struct {
        // context deadline to establish a maximum request time.
        Timeout time.Duration
 
+       // Maximum disk cache size in bytes or percent of total
+       // filesystem size. If zero, use default, currently 10% of
+       // filesystem size.
+       DiskCacheSize ByteSizeOrPercent
+
+       // Where to write debug logs. May be nil.
+       Logger logrus.FieldLogger
+
        dd *DiscoveryDocument
 
        defaultRequestID string
@@ -85,7 +98,10 @@ type Client struct {
        // differs from an outgoing connection limit (a feature
        // provided by http.Transport) when concurrent calls are
        // multiplexed on a single http2 connection.
-       requestLimiter requestLimiter
+       //
+       // getRequestLimiter() should always be used, because this can
+       // be nil.
+       requestLimiter *requestLimiter
 
        last503 atomic.Value
 }
@@ -147,7 +163,8 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
                APIHost:        ctrlURL.Host,
                Insecure:       cluster.TLS.Insecure,
                Timeout:        5 * time.Minute,
-               requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
+               DiskCacheSize:  cluster.Collections.WebDAVCache.DiskCacheSize,
+               requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
        }, nil
 }
 
@@ -235,6 +252,8 @@ var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
 var nopCancelFunc context.CancelFunc = func() {}
 
+var reqErrorRe = regexp.MustCompile(`net/http: invalid header `)
+
 // Do augments (*http.Client)Do(): adds Authorization and X-Request-Id
 // headers, delays in order to comply with rate-limiting restrictions,
 // and retries failed requests when appropriate.
@@ -271,9 +290,11 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        var lastResp *http.Response
        var lastRespBody io.ReadCloser
        var lastErr error
+       var checkRetryCalled int
 
        rclient := retryablehttp.NewClient()
        rclient.HTTPClient = c.httpClient()
+       rclient.Backoff = exponentialBackoff
        if c.Timeout > 0 {
                rclient.RetryWaitMax = c.Timeout / 10
                rclient.RetryMax = 32
@@ -283,11 +304,20 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
                rclient.RetryMax = 0
        }
        rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
-               if c.requestLimiter.Report(resp, respErr) {
+               checkRetryCalled++
+               if c.getRequestLimiter().Report(resp, respErr) {
                        c.last503.Store(time.Now())
                }
                if c.Timeout == 0 {
-                       return false, err
+                       return false, nil
+               }
+               // This check can be removed when
+               // https://github.com/hashicorp/go-retryablehttp/pull/210
+               // (or equivalent) is merged and we update go.mod.
+               // Until then, it is needed to pass
+               // TestNonRetryableStdlibError.
+               if respErr != nil && reqErrorRe.MatchString(respErr.Error()) {
+                       return false, nil
                }
                retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
                if retrying {
@@ -310,21 +340,29 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
        rclient.Logger = nil
 
-       c.requestLimiter.Acquire(ctx)
+       limiter := c.getRequestLimiter()
+       limiter.Acquire(ctx)
        if ctx.Err() != nil {
-               c.requestLimiter.Release()
+               limiter.Release()
                cancel()
                return nil, ctx.Err()
        }
        resp, err := rclient.Do(rreq)
        if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) {
-               resp, err = lastResp, lastErr
+               resp = lastResp
+               err = lastErr
+               if checkRetryCalled > 0 && err != nil {
+                       // Mimic retryablehttp's "giving up after X
+                       // attempts" message, even if we gave up
+                       // because of time rather than maxretries.
+                       err = fmt.Errorf("%s %s giving up after %d attempt(s): %w", req.Method, req.URL.String(), checkRetryCalled, err)
+               }
                if resp != nil {
                        resp.Body = lastRespBody
                }
        }
        if err != nil {
-               c.requestLimiter.Release()
+               limiter.Release()
                cancel()
                return nil, err
        }
@@ -334,7 +372,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        resp.Body = cancelOnClose{
                ReadCloser: resp.Body,
                cancel: func() {
-                       c.requestLimiter.Release()
+                       limiter.Release()
                        cancel()
                },
        }
@@ -348,6 +386,30 @@ func (c *Client) Last503() time.Time {
        return t
 }
 
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+       globalRequestLimiter     = map[string]*requestLimiter{}
+       globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+       if c.requestLimiter != nil {
+               return c.requestLimiter
+       }
+       globalRequestLimiterLock.Lock()
+       defer globalRequestLimiterLock.Unlock()
+       limiter := globalRequestLimiter[c.APIHost]
+       if limiter == nil {
+               limiter = &requestLimiter{}
+               globalRequestLimiter[c.APIHost] = limiter
+       }
+       return limiter
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {
@@ -370,6 +432,40 @@ func isRedirectStatus(code int) bool {
        }
 }
 
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+       if attemptNum > 0 && min < minExponentialBackoffBase {
+               min = minExponentialBackoffBase
+       }
+       var t time.Duration
+       if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+               if s := resp.Header.Get("Retry-After"); s != "" {
+                       if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+                               t = time.Second * time.Duration(sleep)
+                       } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+                               t = stamp.Sub(time.Now())
+                       }
+               }
+       }
+       if t == 0 {
+               jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+               t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+       }
+       if t < min {
+               return min
+       } else if t > max {
+               return max
+       } else {
+               return t
+       }
+}
+
 // DoAndDecode performs req and unmarshals the response (which must be
 // JSON) into dst. Use this instead of RequestAndDecode if you need
 // more control of the http.Request object.
@@ -502,6 +598,12 @@ func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, m
        if err != nil {
                return err
        }
+       if dst == nil {
+               if urlValues == nil {
+                       urlValues = url.Values{}
+               }
+               urlValues["select"] = []string{`["uuid"]`}
+       }
        if urlValues == nil {
                // Nothing to send
        } else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) {
@@ -575,7 +677,11 @@ func (c *Client) apiURL(path string) string {
        if scheme == "" {
                scheme = "https"
        }
-       return scheme + "://" + c.APIHost + "/" + path
+       // Double-slash in URLs tend to cause subtle hidden problems
+       // (e.g., they can behave differently when a load balancer is
+       // in the picture). Here we ensure exactly one "/" regardless
+       // of whether the given APIHost or path has a superfluous one.
+       return scheme + "://" + strings.TrimSuffix(c.APIHost, "/") + "/" + strings.TrimPrefix(path, "/")
 }
 
 // DiscoveryDocument is the Arvados server's description of itself.
@@ -586,6 +692,7 @@ type DiscoveryDocument struct {
        GitURL                       string              `json:"gitUrl"`
        Schemas                      map[string]Schema   `json:"schemas"`
        Resources                    map[string]Resource `json:"resources"`
+       Revision                     string              `json:"revision"`
 }
 
 type Resource struct {