X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a5401e349b25eb0464b06fd291e9c1a18fb494aa..cef1d949ecc75417a3575be79f683b2b0048953f:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index f82e5c7c59..2f9ea44ab8 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -2,6 +2,7 @@ package keepclient import ( + "bytes" "crypto/md5" "crypto/tls" "errors" @@ -29,19 +30,29 @@ var MissingArvadosApiHost = errors.New("Missing required environment variable AR var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") var InvalidLocatorError = errors.New("Invalid locator") +// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server +var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found") + +// ErrIncompleteIndex is returned when the Index response does not end with a new empty line +var ErrIncompleteIndex = errors.New("Got incomplete index") + const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" // Information about Arvados and Keep servers. type KeepClient struct { - Arvados *arvadosclient.ArvadosClient - Want_replicas int - Using_proxy bool - localRoots *map[string]string + Arvados *arvadosclient.ArvadosClient + Want_replicas int + Using_proxy bool + localRoots *map[string]string writableLocalRoots *map[string]string - gatewayRoots *map[string]string - lock sync.RWMutex - Client *http.Client + gatewayRoots *map[string]string + lock sync.RWMutex + Client *http.Client + Retries int + + // set to 1 if all writable services are of disk type, otherwise 0 + replicasPerService int } // Create a new KeepClient. This will contact the API server to discover Keep @@ -49,12 +60,23 @@ type KeepClient struct { func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) + + defaultReplicationLevel := 2 + value, err := arv.Discovery("defaultCollectionReplication") + if err == nil { + v, ok := value.(float64) + if ok && v > 0 { + defaultReplicationLevel = int(v) + } + } + kc := &KeepClient{ Arvados: arv, - Want_replicas: 2, + Want_replicas: defaultReplicationLevel, Using_proxy: false, Client: &http.Client{Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}}, + Retries: 2, } return kc, kc.DiscoverKeepServers() } @@ -117,46 +139,80 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error } } -// Get() retrieves a block, given a locator. Returns a reader, the -// expected data length, the URL the block is being fetched from, and -// an error. -// -// If the block checksum does not match, the final Read() on the -// reader returned by this method will return a BadChecksum error -// instead of EOF. -func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { +func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) { var errs []string - for _, host := range kc.getSortedRoots(locator) { - url := host + "/" + locator - req, err := http.NewRequest("GET", url, nil) - if err != nil { - continue - } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) - resp, err := kc.Client.Do(req) - if err != nil || resp.StatusCode != http.StatusOK { - if resp != nil { + + tries_remaining := 1 + kc.Retries + serversToTry := kc.getSortedRoots(locator) + var retryList []string + + for tries_remaining > 0 { + tries_remaining -= 1 + retryList = nil + + for _, host := range serversToTry { + url := host + "/" + locator + + req, err := http.NewRequest(method, url, nil) + if err != nil { + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + continue + } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil { + // Probably a network error, may be transient, + // can try again. + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + retryList = append(retryList, host) + } else if resp.StatusCode != http.StatusOK { var respbody []byte - if resp.Body != nil { - respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + resp.Body.Close() + errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", + url, resp.StatusCode, bytes.TrimSpace(respbody))) + + if resp.StatusCode == 408 || + resp.StatusCode == 429 || + resp.StatusCode >= 500 { + // Timeout, too many requests, or other + // server side failure, transient + // error, can try again. + retryList = append(retryList, host) } - errs = append(errs, fmt.Sprintf("%s: %d %s", - url, resp.StatusCode, strings.TrimSpace(string(respbody)))) } else { - errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + // Success. + if method == "GET" { + return HashCheckingReader{ + Reader: resp.Body, + Hash: md5.New(), + Check: locator[0:32], + }, resp.ContentLength, url, nil + } else { + resp.Body.Close() + return nil, resp.ContentLength, url, nil + } } - continue + } - return HashCheckingReader{ - Reader: resp.Body, - Hash: md5.New(), - Check: locator[0:32], - }, resp.ContentLength, url, nil + serversToTry = retryList } - log.Printf("DEBUG: GET %s failed: %v", locator, errs) + log.Printf("DEBUG: %s %s failed: %v", method, locator, errs) + return nil, 0, "", BlockNotFound } +// Get() retrieves a block, given a locator. Returns a reader, the +// expected data length, the URL the block is being fetched from, and +// an error. +// +// If the block checksum does not match, the final Read() on the +// reader returned by this method will return a BadChecksum error +// instead of EOF. +func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { + return kc.getOrHead("GET", locator) +} + // Ask() verifies that a block with the given hash is available and // readable, according to at least one Keep service. Unlike Get, it // does not retrieve the data or verify that the data content matches @@ -165,18 +221,60 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) // Returns the data size (content length) reported by the Keep service // and the URI reporting the data size. func (kc *KeepClient) Ask(locator string) (int64, string, error) { - for _, host := range kc.getSortedRoots(locator) { - url := host + "/" + locator - req, err := http.NewRequest("HEAD", url, nil) - if err != nil { - continue - } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) - if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK { - return resp.ContentLength, url, nil - } + _, size, url, err := kc.getOrHead("HEAD", locator) + return size, url, err +} + +// GetIndex retrieves a list of blocks stored on the given server whose hashes +// begin with the given prefix. The returned reader will return an error (other +// than EOF) if the complete index cannot be retrieved. +// +// This is meant to be used only by system components and admin tools. +// It will return an error unless the client is using a "data manager token" +// recognized by the Keep services. +func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) { + url := kc.LocalRoots()[keepServiceUUID] + if url == "" { + return nil, ErrNoSuchKeepServer + } + + url += "/index" + if prefix != "" { + url += "/" + prefix + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil { + return nil, err } - return 0, "", BlockNotFound + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode) + } + + var respBody []byte + respBody, err = ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Got index; verify that it is complete + // The response should be "\n" if no locators matched the prefix + // Else, it should be a list of locators followed by a blank line + if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) { + return nil, ErrIncompleteIndex + } + + // Got complete index; strip the trailing newline and send + return bytes.NewReader(respBody[0 : len(respBody)-1]), nil } // LocalRoots() returns the map of local (i.e., disk and proxy) Keep