import (
"bytes"
"context"
+ "crypto/rand"
"crypto/tls"
"encoding/json"
+ "errors"
"fmt"
"io"
+ "io/fs"
"io/ioutil"
"log"
+ "math"
+ "math/big"
+ mathrand "math/rand"
+ "net"
"net/http"
"net/url"
"os"
"regexp"
+ "strconv"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/go-retryablehttp"
)
// A Client is an HTTP client with an API endpoint and a set of
// HTTP headers to add/override in outgoing requests.
SendHeader http.Header
+ // Timeout for requests. NewClientFromConfig and
+ // NewClientFromEnv return a Client with a default 5 minute
+ // timeout. Within this time, retryable errors are
+ // automatically retried with exponential backoff.
+ //
+ // To disable automatic retries, set Timeout to zero and use a
+ // context deadline to establish a maximum request time.
+ Timeout time.Duration
+
dd *DiscoveryDocument
- ctx context.Context
+ defaultRequestID string
+
+ // APIHost and AuthToken were loaded from ARVADOS_* env vars
+ // (used to customize "no host/token" error messages)
+ loadedFromEnv bool
+
+ // Track/limit concurrent outgoing API calls. Note this
+ // differs from an outgoing connection limit (a feature
+ // provided by http.Transport) when concurrent calls are
+ // multiplexed on a single http2 connection.
+ requestLimiter requestLimiter
+
+ last503 atomic.Value
}
-// The default http.Client used by a Client with Insecure==true and
-// Client==nil.
+// InsecureHTTPClient is the default http.Client used by a Client with
+// Insecure==true and Client==nil.
var InsecureHTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true}},
- Timeout: 5 * time.Minute}
+ InsecureSkipVerify: true}}}
-// The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
- Timeout: 5 * time.Minute}
+// DefaultSecureClient is the default http.Client used by a Client otherwise.
+var DefaultSecureClient = &http.Client{}
// NewClientFromConfig creates a new Client that uses the endpoints in
// the given cluster.
if ctrlURL.Host == "" {
return nil, fmt.Errorf("no host in config Services.Controller.ExternalURL: %v", ctrlURL)
}
+ var hc *http.Client
+ if srvaddr := os.Getenv("ARVADOS_SERVER_ADDRESS"); srvaddr != "" {
+ // When this client is used to make a request to
+ // https://{ctrlhost}:port/ (any port), it dials the
+ // indicated port on ARVADOS_SERVER_ADDRESS instead.
+ //
+ // This is invoked by arvados-server boot to ensure
+ // that server->server traffic (e.g.,
+ // keepproxy->controller) only hits local interfaces,
+ // even if the Controller.ExternalURL host is a load
+ // balancer / gateway and not a local interface
+ // address (e.g., when running on a cloud VM).
+ //
+ // This avoids unnecessary delay/cost of routing
+ // external traffic, and also allows controller to
+ // recognize other services as internal clients based
+ // on the connection source address.
+ divertedHost := (*url.URL)(&cluster.Services.Controller.ExternalURL).Hostname()
+ var dialer net.Dialer
+ hc = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: cluster.TLS.Insecure},
+ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
+ host, port, err := net.SplitHostPort(addr)
+ if err == nil && network == "tcp" && host == divertedHost {
+ addr = net.JoinHostPort(srvaddr, port)
+ }
+ return dialer.DialContext(ctx, network, addr)
+ },
+ },
+ }
+ }
return &Client{
- Scheme: ctrlURL.Scheme,
- APIHost: ctrlURL.Host,
- Insecure: cluster.TLS.Insecure,
+ Client: hc,
+ Scheme: ctrlURL.Scheme,
+ APIHost: ctrlURL.Host,
+ Insecure: cluster.TLS.Insecure,
+ Timeout: 5 * time.Minute,
+ requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
}, nil
}
// NewClientFromEnv creates a new Client that uses the default HTTP
-// client with the API endpoint and credentials given by the
-// ARVADOS_API_* environment variables.
+// client, and loads API endpoint and credentials from ARVADOS_*
+// environment variables (if set) and
+// $HOME/.config/arvados/settings.conf (if readable).
+//
+// If a config exists in both locations, the environment variable is
+// used.
+//
+// If there is an error (other than ENOENT) reading settings.conf,
+// NewClientFromEnv logs the error to log.Default(), then proceeds as
+// if settings.conf did not exist.
+//
+// Space characters are trimmed when reading the settings file, so
+// these are equivalent:
+//
+// ARVADOS_API_HOST=localhost\n
+// ARVADOS_API_HOST=localhost\r\n
+// ARVADOS_API_HOST = localhost \n
+// \tARVADOS_API_HOST = localhost\n
func NewClientFromEnv() *Client {
+ vars := map[string]string{}
+ home := os.Getenv("HOME")
+ conffile := home + "/.config/arvados/settings.conf"
+ if home == "" {
+ // no $HOME => just use env vars
+ } else if settings, err := os.ReadFile(conffile); errors.Is(err, fs.ErrNotExist) {
+ // no config file => just use env vars
+ } else if err != nil {
+ // config file unreadable => log message, then use env vars
+ log.Printf("continuing without loading %s: %s", conffile, err)
+ } else {
+ for _, line := range bytes.Split(settings, []byte{'\n'}) {
+ kv := bytes.SplitN(line, []byte{'='}, 2)
+ k := string(bytes.TrimSpace(kv[0]))
+ if len(kv) != 2 || !strings.HasPrefix(k, "ARVADOS_") {
+ // Same behavior as python sdk:
+ // silently skip leading # (comments),
+ // blank lines, typos, and non-Arvados
+ // vars.
+ continue
+ }
+ vars[k] = string(bytes.TrimSpace(kv[1]))
+ }
+ }
+ for _, env := range os.Environ() {
+ if !strings.HasPrefix(env, "ARVADOS_") {
+ continue
+ }
+ kv := strings.SplitN(env, "=", 2)
+ if len(kv) == 2 {
+ vars[kv[0]] = kv[1]
+ }
+ }
var svcs []string
- for _, s := range strings.Split(os.Getenv("ARVADOS_KEEP_SERVICES"), " ") {
+ for _, s := range strings.Split(vars["ARVADOS_KEEP_SERVICES"], " ") {
if s == "" {
continue
} else if u, err := url.Parse(s); err != nil {
}
}
var insecure bool
- if s := strings.ToLower(os.Getenv("ARVADOS_API_HOST_INSECURE")); s == "1" || s == "yes" || s == "true" {
+ if s := strings.ToLower(vars["ARVADOS_API_HOST_INSECURE"]); s == "1" || s == "yes" || s == "true" {
insecure = true
}
return &Client{
Scheme: "https",
- APIHost: os.Getenv("ARVADOS_API_HOST"),
- AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+ APIHost: vars["ARVADOS_API_HOST"],
+ AuthToken: vars["ARVADOS_API_TOKEN"],
Insecure: insecure,
KeepServiceURIs: svcs,
+ Timeout: 5 * time.Minute,
+ loadedFromEnv: true,
}
}
var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
-// Do adds Authorization and X-Request-Id headers and then calls
-// (*http.Client)Do().
+var nopCancelFunc context.CancelFunc = func() {}
+
+// Do augments (*http.Client)Do(): adds Authorization and X-Request-Id
+// headers, delays in order to comply with rate-limiting restrictions,
+// and retries failed requests when appropriate.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
- if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+ ctx := req.Context()
+ if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
req.Header.Add("Authorization", auth)
} else if c.AuthToken != "" {
req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
}
if req.Header.Get("X-Request-Id") == "" {
- reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
- if reqid == "" {
- reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
- }
- if reqid == "" {
+ var reqid string
+ if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ reqid = ctxreqid
+ } else if c.defaultRequestID != "" {
+ reqid = c.defaultRequestID
+ } else {
reqid = reqIDGen.Next()
}
if req.Header == nil {
req.Header.Set("X-Request-Id", reqid)
}
}
- return c.httpClient().Do(req)
+
+ rreq, err := retryablehttp.FromRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ cancel := nopCancelFunc
+ var lastResp *http.Response
+ var lastRespBody io.ReadCloser
+ var lastErr error
+
+ rclient := retryablehttp.NewClient()
+ rclient.HTTPClient = c.httpClient()
+ rclient.Backoff = exponentialBackoff
+ if c.Timeout > 0 {
+ rclient.RetryWaitMax = c.Timeout / 10
+ rclient.RetryMax = 32
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+ rreq = rreq.WithContext(ctx)
+ } else {
+ rclient.RetryMax = 0
+ }
+ rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
+ if c.requestLimiter.Report(resp, respErr) {
+ c.last503.Store(time.Now())
+ }
+ if c.Timeout == 0 {
+ return false, err
+ }
+ retrying, err := retryablehttp.DefaultRetryPolicy(ctx, resp, respErr)
+ if retrying {
+ lastResp, lastRespBody, lastErr = resp, nil, respErr
+ if respErr == nil {
+ // Save the response and body so we
+ // can return it instead of "deadline
+ // exceeded". retryablehttp.Client
+ // will drain and discard resp.body,
+ // so we need to stash it separately.
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err == nil {
+ lastRespBody = io.NopCloser(bytes.NewReader(buf))
+ } else {
+ lastResp, lastErr = nil, err
+ }
+ }
+ }
+ return retrying, err
+ }
+ rclient.Logger = nil
+
+ c.requestLimiter.Acquire(ctx)
+ if ctx.Err() != nil {
+ c.requestLimiter.Release()
+ cancel()
+ return nil, ctx.Err()
+ }
+ resp, err := rclient.Do(rreq)
+ if (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) && (lastResp != nil || lastErr != nil) {
+ resp, err = lastResp, lastErr
+ if resp != nil {
+ resp.Body = lastRespBody
+ }
+ }
+ if err != nil {
+ c.requestLimiter.Release()
+ cancel()
+ return nil, err
+ }
+ // We need to call cancel() eventually, but we can't use
+ // "defer cancel()" because the context has to stay alive
+ // until the caller has finished reading the response body.
+ resp.Body = cancelOnClose{
+ ReadCloser: resp.Body,
+ cancel: func() {
+ c.requestLimiter.Release()
+ cancel()
+ },
+ }
+ return resp, err
+}
+
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+ t, _ := c.last503.Load().(time.Time)
+ return t
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+ io.ReadCloser
+ cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+ err := coc.ReadCloser.Close()
+ coc.cancel()
+ return err
}
func isRedirectStatus(code int) bool {
}
}
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+ if attemptNum > 0 && min < minExponentialBackoffBase {
+ min = minExponentialBackoffBase
+ }
+ var t time.Duration
+ if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+ if s := resp.Header.Get("Retry-After"); s != "" {
+ if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+ t = time.Second * time.Duration(sleep)
+ } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+ t = stamp.Sub(time.Now())
+ }
+ }
+ }
+ if t == 0 {
+ jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+ t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+ }
+ if t < min {
+ return min
+ } else if t > max {
+ return max
+ } else {
+ return t
+ }
+}
+
// DoAndDecode performs req and unmarshals the response (which must be
// JSON) into dst. Use this instead of RequestAndDecode if you need
// more control of the http.Request object.
return err
}
switch {
+ case resp.StatusCode == http.StatusNoContent:
+ return nil
case resp.StatusCode == http.StatusOK && dst == nil:
return nil
case resp.StatusCode == http.StatusOK:
return nil
case isRedirectStatus(resp.StatusCode):
// Copy the redirect target URL to dst.RedirectLocation.
- buf, err := json.Marshal(map[string]string{"RedirectLocation": resp.Header.Get("Location")})
+ buf, err := json.Marshal(map[string]string{"redirect_location": resp.Header.Get("Location")})
if err != nil {
return err
}
// Convert an arbitrary struct to url.Values. For example,
//
-// Foo{Bar: []int{1,2,3}, Baz: "waz"}
+// Foo{Bar: []int{1,2,3}, Baz: "waz"}
//
// becomes
//
-// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
//
// params itself is returned if it is already an url.Values.
func anythingToValues(params interface{}) (url.Values, error) {
//
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
- return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+ return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
}
+// RequestAndDecodeContext does the same as RequestAndDecode, but with a context
func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
if body, ok := body.(io.Closer); ok {
// Ensure body is closed even if we error out early
defer body.Close()
}
+ if c.APIHost == "" {
+ if c.loadedFromEnv {
+ return errors.New("ARVADOS_API_HOST and/or ARVADOS_API_TOKEN environment variables are not set")
+ }
+ return errors.New("arvados.Client cannot perform request: APIHost is not set")
+ }
urlString := c.apiURL(path)
urlValues, err := anythingToValues(params)
if err != nil {
return err
}
+ if dst == nil {
+ if urlValues == nil {
+ urlValues = url.Values{}
+ }
+ urlValues["select"] = []string{`["uuid"]`}
+ }
if urlValues == nil {
// Nothing to send
} else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) {
// header.
func (c *Client) WithRequestID(reqid string) *Client {
cc := *c
- cc.ctx = ContextWithRequestID(cc.context(), reqid)
+ cc.defaultRequestID = reqid
return &cc
}
-func (c *Client) context() context.Context {
- if c.ctx == nil {
- return context.Background()
- }
- return c.ctx
-}
-
func (c *Client) httpClient() *http.Client {
switch {
case c.Client != nil:
}
return path, nil
}
+
+var maxUUIDInt = (&big.Int{}).Exp(big.NewInt(36), big.NewInt(15), nil)
+
+func RandomUUID(clusterID, infix string) string {
+ n, err := rand.Int(rand.Reader, maxUUIDInt)
+ if err != nil {
+ panic(err)
+ }
+ nstr := n.Text(36)
+ for len(nstr) < 15 {
+ nstr = "0" + nstr
+ }
+ return clusterID + "-" + infix + "-" + nstr
+}