20200: Add limiter for log create requests
[arvados.git] / sdk / go / arvados / client.go
index 5a498b01f0c98716ae0c34466272bf43ea7bf812..05176214ae1eb188fd4c8a0113b76e115f095a99 100644 (file)
@@ -23,6 +23,7 @@ import (
        "os"
        "regexp"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/httpserver"
@@ -76,6 +77,14 @@ type Client struct {
        // APIHost and AuthToken were loaded from ARVADOS_* env vars
        // (used to customize "no host/token" error messages)
        loadedFromEnv bool
+
+       // Track/limit concurrent outgoing API calls. Note this
+       // differs from an outgoing connection limit (a feature
+       // provided by http.Transport) when concurrent calls are
+       // multiplexed on a single http2 connection.
+       requestLimiter requestLimiter
+
+       last503 atomic.Value
 }
 
 // InsecureHTTPClient is the default http.Client used by a Client with
@@ -220,10 +229,12 @@ func NewClientFromEnv() *Client {
 
 var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
-// Do adds Authorization and X-Request-Id headers and then calls
+// Do adds Authorization and X-Request-Id headers, delays in order to
+// comply with rate-limiting restrictions, and then calls
 // (*http.Client)Do().
 func (c *Client) Do(req *http.Request) (*http.Response, error) {
-       if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+       ctx := req.Context()
+       if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
                req.Header.Add("Authorization", auth)
        } else if c.AuthToken != "" {
                req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
@@ -231,7 +242,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 
        if req.Header.Get("X-Request-Id") == "" {
                var reqid string
-               if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+               if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
                        reqid = ctxreqid
                } else if c.defaultRequestID != "" {
                        reqid = c.defaultRequestID
@@ -246,23 +257,48 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
        var cancel context.CancelFunc
        if c.Timeout > 0 {
-               ctx := req.Context()
                ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
                req = req.WithContext(ctx)
+       } else {
+               cancel = context.CancelFunc(func() {})
+       }
+
+       c.requestLimiter.Acquire(ctx)
+       if ctx.Err() != nil {
+               c.requestLimiter.Release()
+               return nil, ctx.Err()
+       }
+
+       // Attach Release() to cancel func, see cancelOnClose below.
+       cancelOrig := cancel
+       cancel = func() {
+               c.requestLimiter.Release()
+               cancelOrig()
        }
+
        resp, err := c.httpClient().Do(req)
-       if err == nil && cancel != nil {
+       if c.requestLimiter.Report(resp, err) {
+               c.last503.Store(time.Now())
+       }
+       if err == nil {
                // We need to call cancel() eventually, but we can't
                // use "defer cancel()" because the context has to
                // stay alive until the caller has finished reading
                // the response body.
                resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
-       } else if cancel != nil {
+       } else {
                cancel()
        }
        return resp, err
 }
 
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+       t, _ := c.last503.Load().(time.Time)
+       return t
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {