X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9bc0194676e3f22b41976fefc6146d7dd965d173..72900c01e197d602e79fda8d306b17fd1e32a3ea:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 0e6fadcc35..b56cc7f724 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -4,19 +4,18 @@ package keepclient import ( "bytes" "crypto/md5" - "crypto/tls" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" - "log" "net/http" "regexp" "strconv" "strings" "sync" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/streamer" ) // A Keep "block" is 64MB. @@ -49,8 +48,11 @@ type ErrNotFound struct { multipleResponseError } -var InsufficientReplicasError = errors.New("Could not write sufficient replicas") -var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")") +type InsufficientReplicasError error + +type OversizeBlockError error + +var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")) var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") var InvalidLocatorError = errors.New("Invalid locator") @@ -64,20 +66,27 @@ var ErrIncompleteIndex = errors.New("Got incomplete index") const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" +type HTTPClient interface { + Do(*http.Request) (*http.Response, error) +} + // Information about Arvados and Keep servers. type KeepClient struct { Arvados *arvadosclient.ArvadosClient Want_replicas int - Using_proxy bool localRoots *map[string]string writableLocalRoots *map[string]string gatewayRoots *map[string]string lock sync.RWMutex - Client *http.Client + Client HTTPClient Retries int + BlockCache *BlockCache // set to 1 if all writable services are of disk type, otherwise 0 replicasPerService int + + // Any non-disk typed services found in the list of keepservers? + foundNonDiskSvc bool } // MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers. @@ -101,9 +110,8 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { kc := &KeepClient{ Arvados: arv, Want_replicas: defaultReplicationLevel, - Using_proxy: false, Client: &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}}, + TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}}, Retries: 2, } return kc @@ -115,14 +123,14 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { // Returns the locator for the written block, the number of replicas // written, and an error. // -// Returns an InsufficientReplicas error if 0 <= replicas < +// Returns an InsufficientReplicasError if 0 <= replicas < // kc.Wants_replicas. func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) { // Buffer for reads from 'r' var bufsize int if dataBytes > 0 { if dataBytes > BLOCKSIZE { - return "", 0, OversizeBlockError + return "", 0, ErrOversizeBlock } bufsize = int(dataBytes) } else { @@ -168,6 +176,10 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error } func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) { + if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") { + return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil + } + var errs []string tries_remaining := 1 + kc.Retries @@ -232,7 +244,7 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i } serversToTry = retryList } - log.Printf("DEBUG: %s %s failed: %v", method, locator, errs) + DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs) var err error if count404 == numServers { @@ -352,7 +364,7 @@ func (kc *KeepClient) WritableLocalRoots() map[string]string { // caller can reuse/modify them after SetServiceRoots returns, but // they should not be modified by any other goroutine while // SetServiceRoots is running. -func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) { +func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) { locals := make(map[string]string) for uuid, root := range newLocals { locals[uuid] = root @@ -403,6 +415,14 @@ func (kc *KeepClient) getSortedRoots(locator string) []string { return found } +func (kc *KeepClient) cache() *BlockCache { + if kc.BlockCache != nil { + return kc.BlockCache + } else { + return DefaultBlockCache + } +} + type Locator struct { Hash string Size int // -1 if data size is not known