"regexp"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/hashicorp/go-retryablehttp"
+ "github.com/sirupsen/logrus"
)
// A Client is an HTTP client with an API endpoint and a set of
// context deadline to establish a maximum request time.
Timeout time.Duration
+ // Maximum disk cache size in bytes or percent of total
+ // filesystem size. If zero, use default, currently 10% of
+ // filesystem size.
+ DiskCacheSize ByteSizeOrPercent
+
+ // Where to write debug logs. May be nil.
+ Logger logrus.FieldLogger
+
dd *DiscoveryDocument
defaultRequestID string
// differs from an outgoing connection limit (a feature
// provided by http.Transport) when concurrent calls are
// multiplexed on a single http2 connection.
- requestLimiter requestLimiter
+ //
+ // getRequestLimiter() should always be used, because this can
+ // be nil.
+ requestLimiter *requestLimiter
last503 atomic.Value
}
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
Timeout: 5 * time.Minute,
- requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
+ DiskCacheSize: cluster.Collections.WebDAVCache.DiskCacheSize,
+ requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
}, nil
}
}
rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
checkRetryCalled++
- if c.requestLimiter.Report(resp, respErr) {
+ if c.getRequestLimiter().Report(resp, respErr) {
c.last503.Store(time.Now())
}
if c.Timeout == 0 {
return false, nil
}
+ // This check can be removed when
+ // https://github.com/hashicorp/go-retryablehttp/pull/210
+ // (or equivalent) is merged and we update go.mod.
+ // Until then, it is needed to pass
+ // TestNonRetryableStdlibError.
if respErr != nil && reqErrorRe.MatchString(respErr.Error()) {
return false, nil
}
}
rclient.Logger = nil
- c.requestLimiter.Acquire(ctx)
+ limiter := c.getRequestLimiter()
+ limiter.Acquire(ctx)
if ctx.Err() != nil {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
return nil, ctx.Err()
}
}
}
if err != nil {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
return nil, err
}
resp.Body = cancelOnClose{
ReadCloser: resp.Body,
cancel: func() {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
},
}
return t
}
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+ globalRequestLimiter = map[string]*requestLimiter{}
+ globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+ if c.requestLimiter != nil {
+ return c.requestLimiter
+ }
+ globalRequestLimiterLock.Lock()
+ defer globalRequestLimiterLock.Unlock()
+ limiter := globalRequestLimiter[c.APIHost]
+ if limiter == nil {
+ limiter = &requestLimiter{}
+ globalRequestLimiter[c.APIHost] = limiter
+ }
+ return limiter
+}
+
// cancelOnClose calls a provided CancelFunc when its wrapped
// ReadCloser's Close() method is called.
type cancelOnClose struct {