"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/streamer"
writableLocalRoots *map[string]string
gatewayRoots *map[string]string
lock sync.RWMutex
- Client HTTPClient
+ HTTPClient HTTPClient
Retries int
BlockCache *BlockCache
foundNonDiskSvc bool
}
-// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
+// MakeKeepClient creates a new KeepClient, calls
+// DiscoverKeepServices(), and returns when the client is ready to
+// use.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
kc := New(arv)
return kc, kc.DiscoverKeepServers()
}
-// New func creates a new KeepClient struct.
-// This func does not discover keep servers. It is the caller's responsibility.
+// New creates a new KeepClient. The caller must call
+// DiscoverKeepServers() before using the returned client to read or
+// write data.
func New(arv *arvadosclient.ArvadosClient) *KeepClient {
defaultReplicationLevel := 2
value, err := arv.Discovery("defaultCollectionReplication")
defaultReplicationLevel = int(v)
}
}
-
- kc := &KeepClient{
+ return &KeepClient{
Arvados: arv,
Want_replicas: defaultReplicationLevel,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
- Retries: 2,
+ Retries: 2,
}
- return kc
}
// Put a block given the block hash, a reader, and the number of bytes
continue
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
// Probably a network error, may be transient,
// can try again.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
return nil, err
}
}
}
+var (
+ // There are four global http.Client objects for the four
+ // possible permutations of TLS behavior (verify/skip-verify)
+ // and timeout settings (proxy/non-proxy).
+ defaultClient = map[bool]map[bool]HTTPClient{
+ // defaultClient[false] is used for verified TLS reqs
+ false: {},
+ // defaultClient[true] is used for unverified
+ // (insecure) TLS reqs
+ true: {},
+ }
+ defaultClientMtx sync.Mutex
+)
+
+// httpClient returns the HTTPClient field if it's not nil, otherwise
+// whichever of the four global http.Client objects is suitable for
+// the current environment (i.e., TLS verification on/off, keep
+// services are/aren't proxies).
+func (kc *KeepClient) httpClient() HTTPClient {
+ if kc.HTTPClient != nil {
+ return kc.HTTPClient
+ }
+ defaultClientMtx.Lock()
+ defer defaultClientMtx.Unlock()
+ if c, ok := defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc]; ok {
+ return c
+ }
+
+ var requestTimeout, connectTimeout, keepAliveInterval, tlsTimeout time.Duration
+ if kc.foundNonDiskSvc {
+ // Use longer timeouts when connecting to a proxy,
+ // because this usually means the intervening network
+ // is slower.
+ requestTimeout = 300 * time.Second
+ connectTimeout = 30 * time.Second
+ tlsTimeout = 10 * time.Second
+ keepAliveInterval = 120 * time.Second
+ } else {
+ requestTimeout = 20 * time.Second
+ connectTimeout = 2 * time.Second
+ tlsTimeout = 4 * time.Second
+ keepAliveInterval = 180 * time.Second
+ }
+ transport := &http.Transport{
+ Dial: (&net.Dialer{
+ Timeout: connectTimeout,
+ KeepAlive: keepAliveInterval,
+ }).Dial,
+ TLSClientConfig: arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+ TLSHandshakeTimeout: tlsTimeout,
+ }
+ go func() {
+ for range time.NewTicker(10 * time.Minute).C {
+ transport.CloseIdleConnections()
+ }
+ }()
+ c := &http.Client{
+ Timeout: requestTimeout,
+ Transport: transport,
+ }
+ defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc] = c
+ return c
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known