20200: Add limiter for log create requests
[arvados.git] / sdk / go / arvados / client.go
index cdc07bb0afd2c80b09985ad28f18c6c0fa1abcde..05176214ae1eb188fd4c8a0113b76e115f095a99 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "bytes"
        "context"
+       "crypto/rand"
        "crypto/tls"
        "encoding/json"
        "errors"
@@ -15,12 +16,14 @@ import (
        "io/fs"
        "io/ioutil"
        "log"
+       "math/big"
        "net"
        "net/http"
        "net/url"
        "os"
        "regexp"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/httpserver"
@@ -74,6 +77,14 @@ type Client struct {
        // APIHost and AuthToken were loaded from ARVADOS_* env vars
        // (used to customize "no host/token" error messages)
        loadedFromEnv bool
+
+       // Track/limit concurrent outgoing API calls. Note this
+       // differs from an outgoing connection limit (a feature
+       // provided by http.Transport) when concurrent calls are
+       // multiplexed on a single http2 connection.
+       requestLimiter requestLimiter
+
+       last503 atomic.Value
 }
 
 // InsecureHTTPClient is the default http.Client used by a Client with
@@ -151,10 +162,10 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
 // Space characters are trimmed when reading the settings file, so
 // these are equivalent:
 //
-//   ARVADOS_API_HOST=localhost\n
-//   ARVADOS_API_HOST=localhost\r\n
-//   ARVADOS_API_HOST = localhost \n
-//   \tARVADOS_API_HOST = localhost\n
+//     ARVADOS_API_HOST=localhost\n
+//     ARVADOS_API_HOST=localhost\r\n
+//     ARVADOS_API_HOST = localhost \n
+//     \tARVADOS_API_HOST = localhost\n
 func NewClientFromEnv() *Client {
        vars := map[string]string{}
        home := os.Getenv("HOME")
@@ -218,10 +229,12 @@ func NewClientFromEnv() *Client {
 
 var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
-// Do adds Authorization and X-Request-Id headers and then calls
+// Do adds Authorization and X-Request-Id headers, delays in order to
+// comply with rate-limiting restrictions, and then calls
 // (*http.Client)Do().
 func (c *Client) Do(req *http.Request) (*http.Response, error) {
-       if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+       ctx := req.Context()
+       if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
                req.Header.Add("Authorization", auth)
        } else if c.AuthToken != "" {
                req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
@@ -229,7 +242,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 
        if req.Header.Get("X-Request-Id") == "" {
                var reqid string
-               if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+               if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
                        reqid = ctxreqid
                } else if c.defaultRequestID != "" {
                        reqid = c.defaultRequestID
@@ -244,23 +257,48 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
        var cancel context.CancelFunc
        if c.Timeout > 0 {
-               ctx := req.Context()
                ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
                req = req.WithContext(ctx)
+       } else {
+               cancel = context.CancelFunc(func() {})
+       }
+
+       c.requestLimiter.Acquire(ctx)
+       if ctx.Err() != nil {
+               c.requestLimiter.Release()
+               return nil, ctx.Err()
        }
+
+       // Attach Release() to cancel func, see cancelOnClose below.
+       cancelOrig := cancel
+       cancel = func() {
+               c.requestLimiter.Release()
+               cancelOrig()
+       }
+
        resp, err := c.httpClient().Do(req)
-       if err == nil && cancel != nil {
+       if c.requestLimiter.Report(resp, err) {
+               c.last503.Store(time.Now())
+       }
+       if err == nil {
                // We need to call cancel() eventually, but we can't
                // use "defer cancel()" because the context has to
                // stay alive until the caller has finished reading
                // the response body.
                resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
-       } else if cancel != nil {
+       } else {
                cancel()
        }
        return resp, err
 }
 
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+       t, _ := c.last503.Load().(time.Time)
+       return t
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {
@@ -328,11 +366,11 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
 
 // Convert an arbitrary struct to url.Values. For example,
 //
-//     Foo{Bar: []int{1,2,3}, Baz: "waz"}
+//     Foo{Bar: []int{1,2,3}, Baz: "waz"}
 //
 // becomes
 //
-//     url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+//     url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
 //
 // params itself is returned if it is already an url.Values.
 func anythingToValues(params interface{}) (url.Values, error) {
@@ -599,3 +637,17 @@ func (c *Client) PathForUUID(method, uuid string) (string, error) {
        }
        return path, nil
 }
+
+var maxUUIDInt = (&big.Int{}).Exp(big.NewInt(36), big.NewInt(15), nil)
+
+func RandomUUID(clusterID, infix string) string {
+       n, err := rand.Int(rand.Reader, maxUUIDInt)
+       if err != nil {
+               panic(err)
+       }
+       nstr := n.Text(36)
+       for len(nstr) < 15 {
+               nstr = "0" + nstr
+       }
+       return clusterID + "-" + infix + "-" + nstr
+}