Merge branch '21705-go-deps'
[arvados.git] / sdk / go / arvados / client.go
index c2f6361334d851a259fff3a467d6f4ae13b86ba0..ead66c3d408a2212cbfb6693bf4e03711f913a4d 100644 (file)
@@ -26,11 +26,13 @@ import (
        "regexp"
        "strconv"
        "strings"
+       "sync"
        "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/hashicorp/go-retryablehttp"
+       "github.com/sirupsen/logrus"
 )
 
 // A Client is an HTTP client with an API endpoint and a set of
@@ -76,6 +78,14 @@ type Client struct {
        // context deadline to establish a maximum request time.
        Timeout time.Duration
 
+       // Maximum disk cache size in bytes or percent of total
+       // filesystem size. If zero, use default, currently 10% of
+       // filesystem size.
+       DiskCacheSize ByteSizeOrPercent
+
+       // Where to write debug logs. May be nil.
+       Logger logrus.FieldLogger
+
        dd *DiscoveryDocument
 
        defaultRequestID string
@@ -88,7 +98,10 @@ type Client struct {
        // differs from an outgoing connection limit (a feature
        // provided by http.Transport) when concurrent calls are
        // multiplexed on a single http2 connection.
-       requestLimiter requestLimiter
+       //
+       // getRequestLimiter() should always be used, because this can
+       // be nil.
+       requestLimiter *requestLimiter
 
        last503 atomic.Value
 }
@@ -150,7 +163,8 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
                APIHost:        ctrlURL.Host,
                Insecure:       cluster.TLS.Insecure,
                Timeout:        5 * time.Minute,
-               requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
+               DiskCacheSize:  cluster.Collections.WebDAVCache.DiskCacheSize,
+               requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
        }, nil
 }
 
@@ -238,8 +252,6 @@ var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
 var nopCancelFunc context.CancelFunc = func() {}
 
-var reqErrorRe = regexp.MustCompile(`net/http: invalid header `)
-
 // Do augments (*http.Client)Do(): adds Authorization and X-Request-Id
 // headers, delays in order to comply with rate-limiting restrictions,
 // and retries failed requests when appropriate.
@@ -291,15 +303,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
        rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
                checkRetryCalled++
-               if c.requestLimiter.Report(resp, respErr) {
+               if c.getRequestLimiter().Report(resp, respErr) {
                        c.last503.Store(time.Now())
                }
                if c.Timeout == 0 {
                        return false, nil
                }
-               if respErr != nil && reqErrorRe.MatchString(respErr.Error()) {
-                       return false, nil
-               }
                retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
                if retrying {
                        lastResp, lastRespBody, lastErr = resp, nil, respErr
@@ -321,9 +330,10 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
        rclient.Logger = nil
 
-       c.requestLimiter.Acquire(ctx)
+       limiter := c.getRequestLimiter()
+       limiter.Acquire(ctx)
        if ctx.Err() != nil {
-               c.requestLimiter.Release()
+               limiter.Release()
                cancel()
                return nil, ctx.Err()
        }
@@ -342,7 +352,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
                }
        }
        if err != nil {
-               c.requestLimiter.Release()
+               limiter.Release()
                cancel()
                return nil, err
        }
@@ -352,7 +362,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        resp.Body = cancelOnClose{
                ReadCloser: resp.Body,
                cancel: func() {
-                       c.requestLimiter.Release()
+                       limiter.Release()
                        cancel()
                },
        }
@@ -366,6 +376,30 @@ func (c *Client) Last503() time.Time {
        return t
 }
 
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+       globalRequestLimiter     = map[string]*requestLimiter{}
+       globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+       if c.requestLimiter != nil {
+               return c.requestLimiter
+       }
+       globalRequestLimiterLock.Lock()
+       defer globalRequestLimiterLock.Unlock()
+       limiter := globalRequestLimiter[c.APIHost]
+       if limiter == nil {
+               limiter = &requestLimiter{}
+               globalRequestLimiter[c.APIHost] = limiter
+       }
+       return limiter
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {