20831: Add DiscoveryDocument to arvados.API
[arvados.git] / sdk / go / arvados / client.go
index ea3cb6899e7bd5ac9f92cc0d3127e6aa3a485f13..735a44d24c9263d3248fd5c9ae2b0574ca15ca22 100644 (file)
@@ -7,21 +7,30 @@ package arvados
 import (
        "bytes"
        "context"
+       "crypto/rand"
        "crypto/tls"
        "encoding/json"
        "errors"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "log"
+       "math"
+       "math/big"
+       mathrand "math/rand"
+       "net"
        "net/http"
        "net/url"
        "os"
        "regexp"
+       "strconv"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/go-retryablehttp"
 )
 
 // A Client is an HTTP client with an API endpoint and a set of
@@ -60,9 +69,11 @@ type Client struct {
 
        // Timeout for requests. NewClientFromConfig and
        // NewClientFromEnv return a Client with a default 5 minute
-       // timeout.  To disable this timeout and rely on each
-       // http.Request's context deadline instead, set Timeout to
-       // zero.
+       // timeout. Within this time, retryable errors are
+       // automatically retried with exponential backoff.
+       //
+       // To disable automatic retries, set Timeout to zero and use a
+       // context deadline to establish a maximum request time.
        Timeout time.Duration
 
        dd *DiscoveryDocument
@@ -72,6 +83,14 @@ type Client struct {
        // APIHost and AuthToken were loaded from ARVADOS_* env vars
        // (used to customize "no host/token" error messages)
        loadedFromEnv bool
+
+       // Track/limit concurrent outgoing API calls. Note this
+       // differs from an outgoing connection limit (a feature
+       // provided by http.Transport) when concurrent calls are
+       // multiplexed on a single http2 connection.
+       requestLimiter requestLimiter
+
+       last503 atomic.Value
 }
 
 // InsecureHTTPClient is the default http.Client used by a Client with
@@ -93,20 +112,103 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
        if ctrlURL.Host == "" {
                return nil, fmt.Errorf("no host in config Services.Controller.ExternalURL: %v", ctrlURL)
        }
+       var hc *http.Client
+       if srvaddr := os.Getenv("ARVADOS_SERVER_ADDRESS"); srvaddr != "" {
+               // When this client is used to make a request to
+               // https://{ctrlhost}:port/ (any port), it dials the
+               // indicated port on ARVADOS_SERVER_ADDRESS instead.
+               //
+               // This is invoked by arvados-server boot to ensure
+               // that server->server traffic (e.g.,
+               // keepproxy->controller) only hits local interfaces,
+               // even if the Controller.ExternalURL host is a load
+               // balancer / gateway and not a local interface
+               // address (e.g., when running on a cloud VM).
+               //
+               // This avoids unnecessary delay/cost of routing
+               // external traffic, and also allows controller to
+               // recognize other services as internal clients based
+               // on the connection source address.
+               divertedHost := (*url.URL)(&cluster.Services.Controller.ExternalURL).Hostname()
+               var dialer net.Dialer
+               hc = &http.Client{
+                       Transport: &http.Transport{
+                               TLSClientConfig: &tls.Config{InsecureSkipVerify: cluster.TLS.Insecure},
+                               DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
+                                       host, port, err := net.SplitHostPort(addr)
+                                       if err == nil && network == "tcp" && host == divertedHost {
+                                               addr = net.JoinHostPort(srvaddr, port)
+                                       }
+                                       return dialer.DialContext(ctx, network, addr)
+                               },
+                       },
+               }
+       }
        return &Client{
-               Scheme:   ctrlURL.Scheme,
-               APIHost:  ctrlURL.Host,
-               Insecure: cluster.TLS.Insecure,
-               Timeout:  5 * time.Minute,
+               Client:         hc,
+               Scheme:         ctrlURL.Scheme,
+               APIHost:        ctrlURL.Host,
+               Insecure:       cluster.TLS.Insecure,
+               Timeout:        5 * time.Minute,
+               requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
        }, nil
 }
 
 // NewClientFromEnv creates a new Client that uses the default HTTP
-// client with the API endpoint and credentials given by the
-// ARVADOS_API_* environment variables.
+// client, and loads API endpoint and credentials from ARVADOS_*
+// environment variables (if set) and
+// $HOME/.config/arvados/settings.conf (if readable).
+//
+// If a config exists in both locations, the environment variable is
+// used.
+//
+// If there is an error (other than ENOENT) reading settings.conf,
+// NewClientFromEnv logs the error to log.Default(), then proceeds as
+// if settings.conf did not exist.
+//
+// Space characters are trimmed when reading the settings file, so
+// these are equivalent:
+//
+//     ARVADOS_API_HOST=localhost\n
+//     ARVADOS_API_HOST=localhost\r\n
+//     ARVADOS_API_HOST = localhost \n
+//     \tARVADOS_API_HOST = localhost\n
 func NewClientFromEnv() *Client {
+       vars := map[string]string{}
+       home := os.Getenv("HOME")
+       conffile := home + "/.config/arvados/settings.conf"
+       if home == "" {
+               // no $HOME => just use env vars
+       } else if settings, err := os.ReadFile(conffile); errors.Is(err, fs.ErrNotExist) {
+               // no config file => just use env vars
+       } else if err != nil {
+               // config file unreadable => log message, then use env vars
+               log.Printf("continuing without loading %s: %s", conffile, err)
+       } else {
+               for _, line := range bytes.Split(settings, []byte{'\n'}) {
+                       kv := bytes.SplitN(line, []byte{'='}, 2)
+                       k := string(bytes.TrimSpace(kv[0]))
+                       if len(kv) != 2 || !strings.HasPrefix(k, "ARVADOS_") {
+                               // Same behavior as python sdk:
+                               // silently skip leading # (comments),
+                               // blank lines, typos, and non-Arvados
+                               // vars.
+                               continue
+                       }
+                       vars[k] = string(bytes.TrimSpace(kv[1]))
+               }
+       }
+       for _, env := range os.Environ() {
+               if !strings.HasPrefix(env, "ARVADOS_") {
+                       continue
+               }
+               kv := strings.SplitN(env, "=", 2)
+               if len(kv) == 2 {
+                       vars[kv[0]] = kv[1]
+               }
+       }
        var svcs []string
-       for _, s := range strings.Split(os.Getenv("ARVADOS_KEEP_SERVICES"), " ") {
+       for _, s := range strings.Split(vars["ARVADOS_KEEP_SERVICES"], " ") {
                if s == "" {
                        continue
                } else if u, err := url.Parse(s); err != nil {
@@ -118,13 +220,13 @@ func NewClientFromEnv() *Client {
                }
        }
        var insecure bool
-       if s := strings.ToLower(os.Getenv("ARVADOS_API_HOST_INSECURE")); s == "1" || s == "yes" || s == "true" {
+       if s := strings.ToLower(vars["ARVADOS_API_HOST_INSECURE"]); s == "1" || s == "yes" || s == "true" {
                insecure = true
        }
        return &Client{
                Scheme:          "https",
-               APIHost:         os.Getenv("ARVADOS_API_HOST"),
-               AuthToken:       os.Getenv("ARVADOS_API_TOKEN"),
+               APIHost:         vars["ARVADOS_API_HOST"],
+               AuthToken:       vars["ARVADOS_API_TOKEN"],
                Insecure:        insecure,
                KeepServiceURIs: svcs,
                Timeout:         5 * time.Minute,
@@ -134,10 +236,14 @@ func NewClientFromEnv() *Client {
 
 var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
-// Do adds Authorization and X-Request-Id headers and then calls
-// (*http.Client)Do().
+var nopCancelFunc context.CancelFunc = func() {}
+
+// Do augments (*http.Client)Do(): adds Authorization and X-Request-Id
+// headers, delays in order to comply with rate-limiting restrictions,
+// and retries failed requests when appropriate.
 func (c *Client) Do(req *http.Request) (*http.Response, error) {
-       if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+       ctx := req.Context()
+       if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
                req.Header.Add("Authorization", auth)
        } else if c.AuthToken != "" {
                req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
@@ -145,7 +251,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 
        if req.Header.Get("X-Request-Id") == "" {
                var reqid string
-               if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+               if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
                        reqid = ctxreqid
                } else if c.defaultRequestID != "" {
                        reqid = c.defaultRequestID
@@ -158,25 +264,94 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
                        req.Header.Set("X-Request-Id", reqid)
                }
        }
-       var cancel context.CancelFunc
+
+       rreq, err := retryablehttp.FromRequest(req)
+       if err != nil {
+               return nil, err
+       }
+
+       cancel := nopCancelFunc
+       var lastResp *http.Response
+       var lastRespBody io.ReadCloser
+       var lastErr error
+
+       rclient := retryablehttp.NewClient()
+       rclient.HTTPClient = c.httpClient()
+       rclient.Backoff = exponentialBackoff
        if c.Timeout > 0 {
-               ctx := req.Context()
+               rclient.RetryWaitMax = c.Timeout / 10
+               rclient.RetryMax = 32
                ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
-               req = req.WithContext(ctx)
-       }
-       resp, err := c.httpClient().Do(req)
-       if err == nil && cancel != nil {
-               // We need to call cancel() eventually, but we can't
-               // use "defer cancel()" because the context has to
-               // stay alive until the caller has finished reading
-               // the response body.
-               resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
-       } else if cancel != nil {
+               rreq = rreq.WithContext(ctx)
+       } else {
+               rclient.RetryMax = 0
+       }
+       rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
+               if c.requestLimiter.Report(resp, respErr) {
+                       c.last503.Store(time.Now())
+               }
+               if c.Timeout == 0 {
+                       return false, err
+               }
+               retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
+               if retrying {
+                       lastResp, lastRespBody, lastErr = resp, nil, respErr
+                       if respErr == nil {
+                               // Save the response and body so we
+                               // can return it instead of "deadline
+                               // exceeded". retryablehttp.Client
+                               // will drain and discard resp.body,
+                               // so we need to stash it separately.
+                               buf, err := ioutil.ReadAll(resp.Body)
+                               if err == nil {
+                                       lastRespBody = io.NopCloser(bytes.NewReader(buf))
+                               } else {
+                                       lastResp, lastErr = nil, err
+                               }
+                       }
+               }
+               return retrying, err
+       }
+       rclient.Logger = nil
+
+       c.requestLimiter.Acquire(ctx)
+       if ctx.Err() != nil {
+               c.requestLimiter.Release()
+               cancel()
+               return nil, ctx.Err()
+       }
+       resp, err := rclient.Do(rreq)
+       if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) {
+               resp, err = lastResp, lastErr
+               if resp != nil {
+                       resp.Body = lastRespBody
+               }
+       }
+       if err != nil {
+               c.requestLimiter.Release()
                cancel()
+               return nil, err
+       }
+       // We need to call cancel() eventually, but we can't use
+       // "defer cancel()" because the context has to stay alive
+       // until the caller has finished reading the response body.
+       resp.Body = cancelOnClose{
+               ReadCloser: resp.Body,
+               cancel: func() {
+                       c.requestLimiter.Release()
+                       cancel()
+               },
        }
        return resp, err
 }
 
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+       t, _ := c.last503.Load().(time.Time)
+       return t
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {
@@ -199,6 +374,40 @@ func isRedirectStatus(code int) bool {
        }
 }
 
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+       if attemptNum > 0 && min < minExponentialBackoffBase {
+               min = minExponentialBackoffBase
+       }
+       var t time.Duration
+       if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+               if s := resp.Header.Get("Retry-After"); s != "" {
+                       if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+                               t = time.Second * time.Duration(sleep)
+                       } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+                               t = stamp.Sub(time.Now())
+                       }
+               }
+       }
+       if t == 0 {
+               jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+               t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+       }
+       if t < min {
+               return min
+       } else if t > max {
+               return max
+       } else {
+               return t
+       }
+}
+
 // DoAndDecode performs req and unmarshals the response (which must be
 // JSON) into dst. Use this instead of RequestAndDecode if you need
 // more control of the http.Request object.
@@ -217,6 +426,8 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
                return err
        }
        switch {
+       case resp.StatusCode == http.StatusNoContent:
+               return nil
        case resp.StatusCode == http.StatusOK && dst == nil:
                return nil
        case resp.StatusCode == http.StatusOK:
@@ -242,11 +453,11 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
 
 // Convert an arbitrary struct to url.Values. For example,
 //
-//     Foo{Bar: []int{1,2,3}, Baz: "waz"}
+//     Foo{Bar: []int{1,2,3}, Baz: "waz"}
 //
 // becomes
 //
-//     url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+//     url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
 //
 // params itself is returned if it is already an url.Values.
 func anythingToValues(params interface{}) (url.Values, error) {
@@ -321,15 +532,20 @@ func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, m
        if c.APIHost == "" {
                if c.loadedFromEnv {
                        return errors.New("ARVADOS_API_HOST and/or ARVADOS_API_TOKEN environment variables are not set")
-               } else {
-                       return errors.New("arvados.Client cannot perform request: APIHost is not set")
                }
+               return errors.New("arvados.Client cannot perform request: APIHost is not set")
        }
        urlString := c.apiURL(path)
        urlValues, err := anythingToValues(params)
        if err != nil {
                return err
        }
+       if dst == nil {
+               if urlValues == nil {
+                       urlValues = url.Values{}
+               }
+               urlValues["select"] = []string{`["uuid"]`}
+       }
        if urlValues == nil {
                // Nothing to send
        } else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) {
@@ -414,6 +630,7 @@ type DiscoveryDocument struct {
        GitURL                       string              `json:"gitUrl"`
        Schemas                      map[string]Schema   `json:"schemas"`
        Resources                    map[string]Resource `json:"resources"`
+       Revision                     string              `json:"revision"`
 }
 
 type Resource struct {
@@ -514,3 +731,17 @@ func (c *Client) PathForUUID(method, uuid string) (string, error) {
        }
        return path, nil
 }
+
+var maxUUIDInt = (&big.Int{}).Exp(big.NewInt(36), big.NewInt(15), nil)
+
+func RandomUUID(clusterID, infix string) string {
+       n, err := rand.Int(rand.Reader, maxUUIDInt)
+       if err != nil {
+               panic(err)
+       }
+       nstr := n.Text(36)
+       for len(nstr) < 15 {
+               nstr = "0" + nstr
+       }
+       return clusterID + "-" + infix + "-" + nstr
+}