+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
/* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
package keepclient
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
+ "net"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
+var (
+ DefaultRequestTimeout = 20 * time.Second
+ DefaultConnectTimeout = 2 * time.Second
+ DefaultTLSHandshakeTimeout = 4 * time.Second
+ DefaultKeepAlive = 180 * time.Second
+
+ DefaultProxyRequestTimeout = 300 * time.Second
+ DefaultProxyConnectTimeout = 30 * time.Second
+ DefaultProxyTLSHandshakeTimeout = 10 * time.Second
+ DefaultProxyKeepAlive = 120 * time.Second
+)
+
// Error interface with an error and boolean indicating whether the error is temporary
type Error interface {
error
multipleResponseError
}
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
+type InsufficientReplicasError error
+
+type OversizeBlockError error
+
+var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")"))
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+type HTTPClient interface {
+ Do(*http.Request) (*http.Response, error)
+}
+
// Information about Arvados and Keep servers.
type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
- localRoots *map[string]string
- writableLocalRoots *map[string]string
- gatewayRoots *map[string]string
+ localRoots map[string]string
+ writableLocalRoots map[string]string
+ gatewayRoots map[string]string
lock sync.RWMutex
- Client *http.Client
+ HTTPClient HTTPClient
Retries int
+ BlockCache *BlockCache
// set to 1 if all writable services are of disk type, otherwise 0
replicasPerService int
// Any non-disk typed services found in the list of keepservers?
foundNonDiskSvc bool
+
+ // Disable automatic discovery of keep services
+ disableDiscovery bool
}
-// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
+// MakeKeepClient creates a new KeepClient, calls
+// DiscoverKeepServices(), and returns when the client is ready to
+// use.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
kc := New(arv)
- return kc, kc.DiscoverKeepServers()
+ return kc, kc.discoverServices()
}
-// New func creates a new KeepClient struct.
-// This func does not discover keep servers. It is the caller's responsibility.
+// New creates a new KeepClient. Service discovery will occur on the
+// next read/write operation.
func New(arv *arvadosclient.ArvadosClient) *KeepClient {
defaultReplicationLevel := 2
value, err := arv.Discovery("defaultCollectionReplication")
defaultReplicationLevel = int(v)
}
}
-
- kc := &KeepClient{
+ return &KeepClient{
Arvados: arv,
Want_replicas: defaultReplicationLevel,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
- Retries: 2,
+ Retries: 2,
}
- return kc
}
// Put a block given the block hash, a reader, and the number of bytes
// Returns the locator for the written block, the number of replicas
// written, and an error.
//
-// Returns an InsufficientReplicas error if 0 <= replicas <
+// Returns an InsufficientReplicasError if 0 <= replicas <
// kc.Wants_replicas.
func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
// Buffer for reads from 'r'
var bufsize int
if dataBytes > 0 {
if dataBytes > BLOCKSIZE {
- return "", 0, OversizeBlockError
+ return "", 0, ErrOversizeBlock
}
bufsize = int(dataBytes)
} else {
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
}
+ var expectLength int64
+ if parts := strings.SplitN(locator, "+", 3); len(parts) < 2 {
+ expectLength = -1
+ } else if n, err := strconv.ParseInt(parts[1], 10, 64); err != nil {
+ expectLength = -1
+ } else {
+ expectLength = n
+ }
+
var errs []string
tries_remaining := 1 + kc.Retries
continue
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
// Probably a network error, may be transient,
// can try again.
errs = append(errs, fmt.Sprintf("%s: %v", url, err))
retryList = append(retryList, host)
- } else if resp.StatusCode != http.StatusOK {
+ continue
+ }
+ if resp.StatusCode != http.StatusOK {
var respbody []byte
respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
resp.Body.Close()
} else if resp.StatusCode == 404 {
count404++
}
- } else {
- // Success.
- if method == "GET" {
- return HashCheckingReader{
- Reader: resp.Body,
- Hash: md5.New(),
- Check: locator[0:32],
- }, resp.ContentLength, url, nil
- } else {
+ continue
+ }
+ if expectLength < 0 {
+ if resp.ContentLength < 0 {
resp.Body.Close()
- return nil, resp.ContentLength, url, nil
+ return nil, 0, "", fmt.Errorf("error reading %q: no size hint, no Content-Length header in response", locator)
}
+ expectLength = resp.ContentLength
+ } else if resp.ContentLength >= 0 && expectLength != resp.ContentLength {
+ resp.Body.Close()
+ return nil, 0, "", fmt.Errorf("error reading %q: size hint %d != Content-Length %d", locator, expectLength, resp.ContentLength)
+ }
+ // Success
+ if method == "GET" {
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, expectLength, url, nil
+ } else {
+ resp.Body.Close()
+ return nil, expectLength, url, nil
}
-
}
serversToTry = retryList
}
return kc.getOrHead("GET", locator)
}
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+ return kc.cache().ReadAt(kc, locator, p, off)
+}
+
// Ask() verifies that a block with the given hash is available and
// readable, according to at least one Keep service. Unlike Get, it
// does not retrieve the data or verify that the data content matches
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
return nil, err
}
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.localRoots
+ return kc.localRoots
}
// GatewayRoots() returns the map of Keep remote gateway services:
// uuid -> baseURI.
func (kc *KeepClient) GatewayRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.gatewayRoots
+ return kc.gatewayRoots
}
// WritableLocalRoots() returns the map of writable local Keep services:
// uuid -> baseURI.
func (kc *KeepClient) WritableLocalRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.writableLocalRoots
+ return kc.writableLocalRoots
}
-// SetServiceRoots updates the localRoots and gatewayRoots maps,
-// without risk of disrupting operations that are already in progress.
+// SetServiceRoots disables service discovery and updates the
+// localRoots and gatewayRoots maps, without disrupting operations
+// that are already in progress.
//
-// The KeepClient makes its own copy of the supplied maps, so the
-// caller can reuse/modify them after SetServiceRoots returns, but
-// they should not be modified by any other goroutine while
-// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
- locals := make(map[string]string)
- for uuid, root := range newLocals {
- locals[uuid] = root
- }
-
- writables := make(map[string]string)
- for uuid, root := range newWritableLocals {
- writables[uuid] = root
- }
-
- gateways := make(map[string]string)
- for uuid, root := range newGateways {
- gateways[uuid] = root
- }
+// The supplied maps must not be modified after calling
+// SetServiceRoots.
+func (kc *KeepClient) SetServiceRoots(locals, writables, gateways map[string]string) {
+ kc.disableDiscovery = true
+ kc.setServiceRoots(locals, writables, gateways)
+}
+func (kc *KeepClient) setServiceRoots(locals, writables, gateways map[string]string) {
kc.lock.Lock()
defer kc.lock.Unlock()
- kc.localRoots = &locals
- kc.writableLocalRoots = &writables
- kc.gatewayRoots = &gateways
+ kc.localRoots = locals
+ kc.writableLocalRoots = writables
+ kc.gatewayRoots = gateways
}
// getSortedRoots returns a list of base URIs of Keep services, in the
return found
}
+func (kc *KeepClient) cache() *BlockCache {
+ if kc.BlockCache != nil {
+ return kc.BlockCache
+ } else {
+ return DefaultBlockCache
+ }
+}
+
+func (kc *KeepClient) ClearBlockCache() {
+ kc.cache().Clear()
+}
+
+var (
+ // There are four global http.Client objects for the four
+ // possible permutations of TLS behavior (verify/skip-verify)
+ // and timeout settings (proxy/non-proxy).
+ defaultClient = map[bool]map[bool]HTTPClient{
+ // defaultClient[false] is used for verified TLS reqs
+ false: {},
+ // defaultClient[true] is used for unverified
+ // (insecure) TLS reqs
+ true: {},
+ }
+ defaultClientMtx sync.Mutex
+)
+
+// httpClient returns the HTTPClient field if it's not nil, otherwise
+// whichever of the four global http.Client objects is suitable for
+// the current environment (i.e., TLS verification on/off, keep
+// services are/aren't proxies).
+func (kc *KeepClient) httpClient() HTTPClient {
+ if kc.HTTPClient != nil {
+ return kc.HTTPClient
+ }
+ defaultClientMtx.Lock()
+ defer defaultClientMtx.Unlock()
+ if c, ok := defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc]; ok {
+ return c
+ }
+
+ var requestTimeout, connectTimeout, keepAlive, tlsTimeout time.Duration
+ if kc.foundNonDiskSvc {
+ // Use longer timeouts when connecting to a proxy,
+ // because this usually means the intervening network
+ // is slower.
+ requestTimeout = DefaultProxyRequestTimeout
+ connectTimeout = DefaultProxyConnectTimeout
+ tlsTimeout = DefaultProxyTLSHandshakeTimeout
+ keepAlive = DefaultProxyKeepAlive
+ } else {
+ requestTimeout = DefaultRequestTimeout
+ connectTimeout = DefaultConnectTimeout
+ tlsTimeout = DefaultTLSHandshakeTimeout
+ keepAlive = DefaultKeepAlive
+ }
+
+ transport, ok := http.DefaultTransport.(*http.Transport)
+ if ok {
+ copy := *transport
+ transport = ©
+ } else {
+ // Evidently the application has replaced
+ // http.DefaultTransport with a different type, so we
+ // need to build our own from scratch using the Go 1.8
+ // defaults.
+ transport = &http.Transport{
+ MaxIdleConns: 100,
+ IdleConnTimeout: 90 * time.Second,
+ ExpectContinueTimeout: time.Second,
+ }
+ }
+ transport.DialContext = (&net.Dialer{
+ Timeout: connectTimeout,
+ KeepAlive: keepAlive,
+ DualStack: true,
+ }).DialContext
+ transport.TLSHandshakeTimeout = tlsTimeout
+ transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
+ c := &http.Client{
+ Timeout: requestTimeout,
+ Transport: transport,
+ }
+ defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc] = c
+ return c
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known