"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"os"
"regexp"
"strings"
+ "sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var RetryDelay = 2 * time.Second
+var (
+ defaultInsecureHTTPClient *http.Client
+ defaultSecureHTTPClient *http.Client
+ defaultHTTPClientMtx sync.Mutex
+)
+
// Indicates an error that was returned by the API server.
type APIServerError struct {
// Address of server returning error, of the form "host:port".
"/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL
}
-// MakeTLSConfig sets up TLS configuration for communicating with Arvados and Keep services.
+// MakeTLSConfig sets up TLS configuration for communicating with
+// Arvados and Keep services.
func MakeTLSConfig(insecure bool) *tls.Config {
tlsconfig := tls.Config{InsecureSkipVerify: insecure}
if !insecure {
- // Look for /etc/arvados/ca-certificates.crt in addition to normal system certs.
+ // Use the first entry in CertFiles that we can read
+ // certificates from. If none of those work out, use
+ // the Go defaults.
certs := x509.NewCertPool()
for _, file := range CertFiles {
data, err := ioutil.ReadFile(file)
- if err == nil {
- success := certs.AppendCertsFromPEM(data)
- if !success {
- fmt.Printf("Unable to load any certificates from %v", file)
- } else {
- tlsconfig.RootCAs = certs
- break
+ if err != nil {
+ if !os.IsNotExist(err) {
+ log.Printf("error reading %q: %s", file, err)
}
+ continue
+ }
+ if !certs.AppendCertsFromPEM(data) {
+ log.Printf("unable to load any certificates from %v", file)
+ continue
}
+ tlsconfig.RootCAs = certs
+ break
}
- // Will use system default CA roots instead.
}
return &tlsconfig
return value, ErrInvalidArgument
}
}
+
+func (ac *ArvadosClient) httpClient() *http.Client {
+ if ac.Client != nil {
+ return ac.Client
+ }
+ c := &defaultSecureHTTPClient
+ if ac.ApiInsecure {
+ c = &defaultInsecureHTTPClient
+ }
+ if *c == nil {
+ defaultHTTPClientMtx.Lock()
+ defer defaultHTTPClientMtx.Unlock()
+ *c = &http.Client{Transport: &http.Transport{
+ TLSClientConfig: MakeTLSConfig(ac.ApiInsecure)}}
+ }
+ return *c
+}
func (s *ServerRequiredSuite) TestMakeArvadosClientSecure(c *C) {
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- kc, err := MakeArvadosClient()
+ ac, err := MakeArvadosClient()
c.Assert(err, Equals, nil)
- c.Check(kc.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
- c.Check(kc.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
- c.Check(kc.ApiInsecure, Equals, false)
+ c.Check(ac.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Check(ac.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Check(ac.ApiInsecure, Equals, false)
}
func (s *ServerRequiredSuite) TestMakeArvadosClientInsecure(c *C) {
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
- kc, err := MakeArvadosClient()
+ ac, err := MakeArvadosClient()
c.Assert(err, Equals, nil)
- c.Check(kc.ApiInsecure, Equals, true)
- c.Check(kc.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
- c.Check(kc.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
- c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
+ c.Check(ac.ApiInsecure, Equals, true)
+ c.Check(ac.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Check(ac.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Check(ac.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
}
func (s *ServerRequiredSuite) TestGetInvalidUUID(c *C) {
"encoding/json"
"fmt"
"log"
- "net/http"
"os"
"os/signal"
"reflect"
if this.Arvados.KeepServiceURIs != nil {
this.foundNonDiskSvc = true
this.replicasPerService = 0
- if c, ok := this.Client.(*http.Client); ok {
- this.setClientSettingsNonDisk(c)
- }
roots := make(map[string]string)
for i, uri := range this.Arvados.KeepServiceURIs {
roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
gatewayRoots[service.Uuid] = url
}
- if client, ok := this.Client.(*http.Client); ok {
- if this.foundNonDiskSvc {
- this.setClientSettingsNonDisk(client)
- } else {
- this.setClientSettingsDisk(client)
- }
- }
-
this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
return nil
}
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/streamer"
writableLocalRoots *map[string]string
gatewayRoots *map[string]string
lock sync.RWMutex
- Client HTTPClient
+ HTTPClient HTTPClient
Retries int
BlockCache *BlockCache
foundNonDiskSvc bool
}
-// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
+// MakeKeepClient creates a new KeepClient, calls
+// DiscoverKeepServices(), and returns when the client is ready to
+// use.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
kc := New(arv)
return kc, kc.DiscoverKeepServers()
}
-// New func creates a new KeepClient struct.
-// This func does not discover keep servers. It is the caller's responsibility.
+// New creates a new KeepClient. The caller must call
+// DiscoverKeepServers() before using the returned client to read or
+// write data.
func New(arv *arvadosclient.ArvadosClient) *KeepClient {
defaultReplicationLevel := 2
value, err := arv.Discovery("defaultCollectionReplication")
defaultReplicationLevel = int(v)
}
}
-
- kc := &KeepClient{
+ return &KeepClient{
Arvados: arv,
Want_replicas: defaultReplicationLevel,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
- Retries: 2,
+ Retries: 2,
}
- return kc
}
// Put a block given the block hash, a reader, and the number of bytes
continue
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
// Probably a network error, may be transient,
// can try again.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
return nil, err
}
}
}
+var (
+ // There are four global http.Client objects for the four
+ // possible permutations of TLS behavior (verify/skip-verify)
+ // and timeout settings (proxy/non-proxy).
+ defaultClient = map[bool]map[bool]HTTPClient{
+ // defaultClient[false] is used for verified TLS reqs
+ false: {},
+ // defaultClient[true] is used for unverified
+ // (insecure) TLS reqs
+ true: {},
+ }
+ defaultClientMtx sync.Mutex
+)
+
+// httpClient returns the HTTPClient field if it's not nil, otherwise
+// whichever of the four global http.Client objects is suitable for
+// the current environment (i.e., TLS verification on/off, keep
+// services are/aren't proxies).
+func (kc *KeepClient) httpClient() HTTPClient {
+ if kc.HTTPClient != nil {
+ return kc.HTTPClient
+ }
+ defaultClientMtx.Lock()
+ defer defaultClientMtx.Unlock()
+ if c, ok := defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc]; ok {
+ return c
+ }
+
+ var requestTimeout, connectTimeout, keepAliveInterval, tlsTimeout time.Duration
+ if kc.foundNonDiskSvc {
+ // Use longer timeouts when connecting to a proxy,
+ // because this usually means the intervening network
+ // is slower.
+ requestTimeout = 300 * time.Second
+ connectTimeout = 30 * time.Second
+ tlsTimeout = 10 * time.Second
+ keepAliveInterval = 120 * time.Second
+ } else {
+ requestTimeout = 20 * time.Second
+ connectTimeout = 2 * time.Second
+ tlsTimeout = 4 * time.Second
+ keepAliveInterval = 180 * time.Second
+ }
+ transport := &http.Transport{
+ Dial: (&net.Dialer{
+ Timeout: connectTimeout,
+ KeepAlive: keepAliveInterval,
+ }).Dial,
+ TLSClientConfig: arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+ TLSHandshakeTimeout: tlsTimeout,
+ }
+ go func() {
+ for range time.NewTicker(10 * time.Minute).C {
+ transport.CloseIdleConnections()
+ }
+ }()
+ c := &http.Client{
+ Timeout: requestTimeout,
+ Transport: transport,
+ }
+ defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc] = c
+ return c
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known
c.Assert(kc.replicasPerService, Equals, 0)
c.Assert(kc.foundNonDiskSvc, Equals, true)
- c.Assert(kc.Client.(*http.Client).Timeout, Equals, 300*time.Second)
+ c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
}
"io/ioutil"
"log"
"math/rand"
- "net"
"net/http"
"os"
"regexp"
"strings"
- "time"
"git.curoverse.com/arvados.git/sdk/go/streamer"
)
return fmt.Sprintf("%x", md5.Sum([]byte(s)))
}
-// Set timeouts applicable when connecting to non-disk services
-// (assumed to be over the Internet).
-func (*KeepClient) setClientSettingsNonDisk(client *http.Client) {
- // Maximum time to wait for a complete response
- client.Timeout = 300 * time.Second
-
- // TCP and TLS connection settings
- client.Transport = &http.Transport{
- Dial: (&net.Dialer{
- // The maximum time to wait to set up
- // the initial TCP connection.
- Timeout: 30 * time.Second,
-
- // The TCP keep alive heartbeat
- // interval.
- KeepAlive: 120 * time.Second,
- }).Dial,
-
- TLSHandshakeTimeout: 10 * time.Second,
- }
-}
-
-// Set timeouts applicable when connecting to keepstore services directly
-// (assumed to be on the local network).
-func (*KeepClient) setClientSettingsDisk(client *http.Client) {
- // Maximum time to wait for a complete response
- client.Timeout = 20 * time.Second
-
- // TCP and TLS connection timeouts
- client.Transport = &http.Transport{
- Dial: (&net.Dialer{
- // The maximum time to wait to set up
- // the initial TCP connection.
- Timeout: 2 * time.Second,
-
- // The TCP keep alive heartbeat
- // interval.
- KeepAlive: 180 * time.Second,
- }).Dial,
-
- TLSHandshakeTimeout: 4 * time.Second,
- }
-}
-
type svcList struct {
Items []keepService `json:"items"`
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // http.Client.Do will close the body ReadCloser when it is
- // done with it.
+ // Do() will close the body ReadCloser when it is done
+ // with it.
req.Body = body
} else {
// "For client requests, a value of 0 means unknown if Body is
req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
+ if resp, err = this.httpClient().Do(req); err != nil {
DebugPrintf("DEBUG: [%08x] Upload failed %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
return
import (
"bytes"
"log"
- "net/http"
"os"
"strings"
"testing"
arv, err := arvadosclient.MakeArvadosClient()
arv.ApiToken = arvadostest.DataManagerToken
c.Assert(err, check.IsNil)
- s.keepClient = &keepclient.KeepClient{
- Arvados: arv,
- Client: &http.Client{},
- }
- c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+
+ s.keepClient, err = keepclient.MakeKeepClient(arv)
+ c.Assert(err, check.IsNil)
s.putReplicas(c, "foo", 4)
s.putReplicas(c, "bar", 1)
}
statusCode, statusText = http.StatusInternalServerError, err.Error()
return
}
- if client, ok := kc.Client.(*http.Client); ok && client.Transport != nil {
- // Workaround for https://dev.arvados.org/issues/9005
- if t, ok := client.Transport.(*http.Transport); ok {
- t.DisableKeepAlives = true
- }
- }
rdr, err := kc.CollectionFileReader(collection, filename)
if os.IsNotExist(err) {
statusCode = http.StatusNotFound
if cfg.DefaultReplicas > 0 {
kc.Want_replicas = cfg.DefaultReplicas
}
- kc.Client.(*http.Client).Timeout = time.Duration(cfg.Timeout)
go kc.RefreshServices(5*time.Minute, 3*time.Second)
listener, err = net.Listen("tcp", cfg.Listen)
signal.Notify(term, syscall.SIGINT)
// Start serving requests.
- router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc)
+ router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout))
http.Serve(listener, router)
log.Println("shutting down")
http.Handler
*keepclient.KeepClient
*ApiTokenCache
+ timeout time.Duration
+ transport *http.Transport
}
// MakeRESTRouter returns an http.Handler that passes GET and PUT
// requests to the appropriate handlers.
-func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient) http.Handler {
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration) http.Handler {
rest := mux.NewRouter()
h := &proxyHandler{
Handler: rest,
KeepClient: kc,
+ timeout: timeout,
+ transport: &http.Transport{
+ Dial: (&net.Dialer{
+ Timeout: 20 * time.Second,
+ KeepAlive: 10 * time.Second,
+ }).Dial,
+ TLSClientConfig: arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+ TLSHandshakeTimeout: 10 * time.Second,
+ },
ApiTokenCache: &ApiTokenCache{
tokens: make(map[string]int64),
expireTime: 300,
},
}
+ go func(t *http.Transport) {
+ for range time.NewTicker(5 * time.Minute).C {
+ t.CloseIdleConnections()
+ }
+ }(h.transport)
+
if enable_get {
rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Get).Methods("GET", "HEAD")
rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Get).Methods("GET", "HEAD")
}
}()
- kc := *h.KeepClient
- kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
+ kc := h.makeKeepClient(req)
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(kc, h.ApiTokenCache, req); !pass {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
SetCorsHeaders(resp)
resp.Header().Set("Via", "HTTP/1.1 "+viaAlias)
- kc := *h.KeepClient
- kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
+ kc := h.makeKeepClient(req)
var err error
var expectLength int64
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(kc, h.ApiTokenCache, req); !pass {
err = BadAuthorizationHeader
status = http.StatusForbidden
return
}
}()
- kc := *h.KeepClient
-
- ok, token := CheckAuthorizationHeader(&kc, h.ApiTokenCache, req)
+ kc := h.makeKeepClient(req)
+ ok, token := CheckAuthorizationHeader(kc, h.ApiTokenCache, req)
if !ok {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
status = http.StatusOK
resp.Write([]byte("\n"))
}
+
+func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient {
+ kc := *h.KeepClient
+ kc.HTTPClient = &proxyClient{
+ client: &http.Client{
+ Timeout: h.timeout,
+ Transport: h.transport,
+ },
+ proto: req.Proto,
+ }
+ return &kc
+}
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr := MakeRESTRouter(true, true, kc)
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second)
type testcase struct {
sendLength string
keepClient := &keepclient.KeepClient{
Arvados: &arvadosclient.ArvadosClient{},
Want_replicas: 1,
- Client: &http.Client{},
}
// Initialize the pullq and worker
"errors"
"io"
"io/ioutil"
- "net/http"
"os"
"strings"
"testing"
keepClient = &keepclient.KeepClient{
Arvados: arv,
Want_replicas: 1,
- Client: &http.Client{},
}
// discover keep services
log.Fatal(err)
}
kc.Want_replicas = *Replicas
- kc.Client.(*http.Client).Timeout = 10 * time.Minute
+ kc.HTTPClient = &http.Client{
+ Timeout: 10 * time.Minute,
+ Transport: &http.Transport{
+ TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+ },
+ }
overrideServices(kc)