X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1996b03c10e45d4c1959b40333c57261a040bffb..7d887106d3eabb9844c4a687403a18581167a823:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index eb011f070a..2f809b3256 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -13,7 +13,6 @@ import ( "io/ioutil" "log" "net/http" - "os" "regexp" "strconv" "strings" @@ -23,7 +22,33 @@ import ( // A Keep "block" is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 -var BlockNotFound = errors.New("Block not found") +// Error interface with an error and boolean indicating whether the error is temporary +type Error interface { + error + Temporary() bool +} + +// multipleResponseError is of type Error +type multipleResponseError struct { + error + isTemp bool +} + +func (e *multipleResponseError) Temporary() bool { + return e.isTemp +} + +// BlockNotFound is a multipleResponseError where isTemp is false +var BlockNotFound = &ErrNotFound{multipleResponseError{ + error: errors.New("Block not found"), + isTemp: false, +}} + +// ErrNotFound is a multipleResponseError where isTemp can be true or false +type ErrNotFound struct { + multipleResponseError +} + var InsufficientReplicasError = errors.New("Could not write sufficient replicas") var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")") var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") @@ -55,12 +80,15 @@ type KeepClient struct { replicasPerService int } -// Create a new KeepClient. This will contact the API server to discover Keep -// servers. +// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers. func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { - var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") - insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) + kc := New(arv) + return kc, kc.DiscoverKeepServers() +} +// New func creates a new KeepClient struct. +// This func does not discover keep servers. It is the caller's responsibility. +func New(arv *arvadosclient.ArvadosClient) *KeepClient { defaultReplicationLevel := 2 value, err := arv.Discovery("defaultCollectionReplication") if err == nil { @@ -75,10 +103,10 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { Want_replicas: defaultReplicationLevel, Using_proxy: false, Client: &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}}, Retries: 2, } - return kc, kc.DiscoverKeepServers() + return kc } // Put a block given the block hash, a reader, and the number of bytes @@ -143,7 +171,12 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i var errs []string tries_remaining := 1 + kc.Retries + serversToTry := kc.getSortedRoots(locator) + + numServers := len(serversToTry) + count404 := 0 + var retryList []string for tries_remaining > 0 { @@ -167,11 +200,9 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i retryList = append(retryList, host) } else if resp.StatusCode != http.StatusOK { var respbody []byte - if resp.Body != nil { - respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) - resp.Body.Close() - } - errs = append(errs, fmt.Sprintf("%s: %d %s", + respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + resp.Body.Close() + errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", url, resp.StatusCode, bytes.TrimSpace(respbody))) if resp.StatusCode == 408 || @@ -181,6 +212,8 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i // server side failure, transient // error, can try again. retryList = append(retryList, host) + } else if resp.StatusCode == 404 { + count404++ } } else { // Success. @@ -191,6 +224,7 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i Check: locator[0:32], }, resp.ContentLength, url, nil } else { + resp.Body.Close() return nil, resp.ContentLength, url, nil } } @@ -198,9 +232,18 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i } serversToTry = retryList } - log.Printf("DEBUG: GET %s failed: %v", locator, errs) + log.Printf("DEBUG: %s %s failed: %v", method, locator, errs) - return nil, 0, "", BlockNotFound + var err error + if count404 == numServers { + err = BlockNotFound + } else { + err = &ErrNotFound{multipleResponseError{ + error: fmt.Errorf("%s %s failed: %v", method, locator, errs), + isTemp: len(serversToTry) > 0, + }} + } + return nil, 0, "", err } // Get() retrieves a block, given a locator. Returns a reader, the