X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/07b382c82d7d834e801cf9dc85e2ed5ffcd7cd91..72900c01e197d602e79fda8d306b17fd1e32a3ea:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 4cd9f544f7..b56cc7f724 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -4,27 +4,55 @@ package keepclient import ( "bytes" "crypto/md5" - "crypto/tls" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" - "log" "net/http" "regexp" "strconv" "strings" "sync" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/streamer" ) // A Keep "block" is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 -var BlockNotFound = errors.New("Block not found") -var InsufficientReplicasError = errors.New("Could not write sufficient replicas") -var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")") +// Error interface with an error and boolean indicating whether the error is temporary +type Error interface { + error + Temporary() bool +} + +// multipleResponseError is of type Error +type multipleResponseError struct { + error + isTemp bool +} + +func (e *multipleResponseError) Temporary() bool { + return e.isTemp +} + +// BlockNotFound is a multipleResponseError where isTemp is false +var BlockNotFound = &ErrNotFound{multipleResponseError{ + error: errors.New("Block not found"), + isTemp: false, +}} + +// ErrNotFound is a multipleResponseError where isTemp can be true or false +type ErrNotFound struct { + multipleResponseError +} + +type InsufficientReplicasError error + +type OversizeBlockError error + +var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")) var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") var InvalidLocatorError = errors.New("Invalid locator") @@ -38,19 +66,27 @@ var ErrIncompleteIndex = errors.New("Got incomplete index") const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" +type HTTPClient interface { + Do(*http.Request) (*http.Response, error) +} + // Information about Arvados and Keep servers. type KeepClient struct { Arvados *arvadosclient.ArvadosClient Want_replicas int - Using_proxy bool localRoots *map[string]string writableLocalRoots *map[string]string gatewayRoots *map[string]string lock sync.RWMutex - Client *http.Client + Client HTTPClient + Retries int + BlockCache *BlockCache // set to 1 if all writable services are of disk type, otherwise 0 replicasPerService int + + // Any non-disk typed services found in the list of keepservers? + foundNonDiskSvc bool } // MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers. @@ -62,12 +98,21 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { // New func creates a new KeepClient struct. // This func does not discover keep servers. It is the caller's responsibility. func New(arv *arvadosclient.ArvadosClient) *KeepClient { + defaultReplicationLevel := 2 + value, err := arv.Discovery("defaultCollectionReplication") + if err == nil { + v, ok := value.(float64) + if ok && v > 0 { + defaultReplicationLevel = int(v) + } + } + kc := &KeepClient{ Arvados: arv, - Want_replicas: 2, - Using_proxy: false, + Want_replicas: defaultReplicationLevel, Client: &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}}, + TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}}, + Retries: 2, } return kc } @@ -78,14 +123,14 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { // Returns the locator for the written block, the number of replicas // written, and an error. // -// Returns an InsufficientReplicas error if 0 <= replicas < +// Returns an InsufficientReplicasError if 0 <= replicas < // kc.Wants_replicas. func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) { // Buffer for reads from 'r' var bufsize int if dataBytes > 0 { if dataBytes > BLOCKSIZE { - return "", 0, OversizeBlockError + return "", 0, ErrOversizeBlock } bufsize = int(dataBytes) } else { @@ -130,6 +175,89 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error } } +func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) { + if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") { + return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil + } + + var errs []string + + tries_remaining := 1 + kc.Retries + + serversToTry := kc.getSortedRoots(locator) + + numServers := len(serversToTry) + count404 := 0 + + var retryList []string + + for tries_remaining > 0 { + tries_remaining -= 1 + retryList = nil + + for _, host := range serversToTry { + url := host + "/" + locator + + req, err := http.NewRequest(method, url, nil) + if err != nil { + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + continue + } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil { + // Probably a network error, may be transient, + // can try again. + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + retryList = append(retryList, host) + } else if resp.StatusCode != http.StatusOK { + var respbody []byte + respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096}) + resp.Body.Close() + errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", + url, resp.StatusCode, bytes.TrimSpace(respbody))) + + if resp.StatusCode == 408 || + resp.StatusCode == 429 || + resp.StatusCode >= 500 { + // Timeout, too many requests, or other + // server side failure, transient + // error, can try again. + retryList = append(retryList, host) + } else if resp.StatusCode == 404 { + count404++ + } + } else { + // Success. + if method == "GET" { + return HashCheckingReader{ + Reader: resp.Body, + Hash: md5.New(), + Check: locator[0:32], + }, resp.ContentLength, url, nil + } else { + resp.Body.Close() + return nil, resp.ContentLength, url, nil + } + } + + } + serversToTry = retryList + } + DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs) + + var err error + if count404 == numServers { + err = BlockNotFound + } else { + err = &ErrNotFound{multipleResponseError{ + error: fmt.Errorf("%s %s failed: %v", method, locator, errs), + isTemp: len(serversToTry) > 0, + }} + } + return nil, 0, "", err +} + // Get() retrieves a block, given a locator. Returns a reader, the // expected data length, the URL the block is being fetched from, and // an error. @@ -138,34 +266,7 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error // reader returned by this method will return a BadChecksum error // instead of EOF. func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { - var errs []string - for _, host := range kc.getSortedRoots(locator) { - url := host + "/" + locator - req, err := http.NewRequest("GET", url, nil) - if err != nil { - errs = append(errs, fmt.Sprintf("%s: %v", url, err)) - continue - } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) - resp, err := kc.Client.Do(req) - if err != nil { - errs = append(errs, fmt.Sprintf("%s: %v", url, err)) - continue - } else if resp.StatusCode != http.StatusOK { - respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) - resp.Body.Close() - errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", - url, resp.StatusCode, bytes.TrimSpace(respbody))) - continue - } - return HashCheckingReader{ - Reader: resp.Body, - Hash: md5.New(), - Check: locator[0:32], - }, resp.ContentLength, url, nil - } - log.Printf("DEBUG: GET %s failed: %v", locator, errs) - return nil, 0, "", BlockNotFound + return kc.getOrHead("GET", locator) } // Ask() verifies that a block with the given hash is available and @@ -176,18 +277,8 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) // Returns the data size (content length) reported by the Keep service // and the URI reporting the data size. func (kc *KeepClient) Ask(locator string) (int64, string, error) { - for _, host := range kc.getSortedRoots(locator) { - url := host + "/" + locator - req, err := http.NewRequest("HEAD", url, nil) - if err != nil { - continue - } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) - if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK { - return resp.ContentLength, url, nil - } - } - return 0, "", BlockNotFound + _, size, url, err := kc.getOrHead("HEAD", locator) + return size, url, err } // GetIndex retrieves a list of blocks stored on the given server whose hashes @@ -273,7 +364,7 @@ func (kc *KeepClient) WritableLocalRoots() map[string]string { // caller can reuse/modify them after SetServiceRoots returns, but // they should not be modified by any other goroutine while // SetServiceRoots is running. -func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) { +func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) { locals := make(map[string]string) for uuid, root := range newLocals { locals[uuid] = root @@ -324,6 +415,14 @@ func (kc *KeepClient) getSortedRoots(locator string) []string { return found } +func (kc *KeepClient) cache() *BlockCache { + if kc.BlockCache != nil { + return kc.BlockCache + } else { + return DefaultBlockCache + } +} + type Locator struct { Hash string Size int // -1 if data size is not known