package keepclient
import (
+ "bytes"
"crypto/md5"
- "crypto/tls"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
- "log"
"net/http"
- "os"
"regexp"
"strconv"
"strings"
// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
-var BlockNotFound = errors.New("Block not found")
+// Error interface with an error and boolean indicating whether the error is temporary
+type Error interface {
+ error
+ Temporary() bool
+}
+
+// multipleResponseError is of type Error
+type multipleResponseError struct {
+ error
+ isTemp bool
+}
+
+func (e *multipleResponseError) Temporary() bool {
+ return e.isTemp
+}
+
+// BlockNotFound is a multipleResponseError where isTemp is false
+var BlockNotFound = &ErrNotFound{multipleResponseError{
+ error: errors.New("Block not found"),
+ isTemp: false,
+}}
+
+// ErrNotFound is a multipleResponseError where isTemp can be true or false
+type ErrNotFound struct {
+ multipleResponseError
+}
+
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
+// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
+var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
+
+// ErrIncompleteIndex is returned when the Index response does not end with a new empty line
+var ErrIncompleteIndex = errors.New("Got incomplete index")
+
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
// Information about Arvados and Keep servers.
type KeepClient struct {
- Arvados *arvadosclient.ArvadosClient
- Want_replicas int
- Using_proxy bool
- localRoots *map[string]string
+ Arvados *arvadosclient.ArvadosClient
+ Want_replicas int
+ localRoots *map[string]string
writableLocalRoots *map[string]string
- gatewayRoots *map[string]string
- lock sync.RWMutex
- Client *http.Client
+ gatewayRoots *map[string]string
+ lock sync.RWMutex
+ Client *http.Client
+ Retries int
+ BlockCache *BlockCache
+
+ // set to 1 if all writable services are of disk type, otherwise 0
+ replicasPerService int
+
+ // Any non-disk typed services found in the list of keepservers?
+ foundNonDiskSvc bool
}
-// Create a new KeepClient. This will contact the API server to discover Keep
-// servers.
+// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
- var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
- insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ kc := New(arv)
+ return kc, kc.DiscoverKeepServers()
+}
+
+// New func creates a new KeepClient struct.
+// This func does not discover keep servers. It is the caller's responsibility.
+func New(arv *arvadosclient.ArvadosClient) *KeepClient {
+ defaultReplicationLevel := 2
+ value, err := arv.Discovery("defaultCollectionReplication")
+ if err == nil {
+ v, ok := value.(float64)
+ if ok && v > 0 {
+ defaultReplicationLevel = int(v)
+ }
+ }
+
kc := &KeepClient{
Arvados: arv,
- Want_replicas: 2,
- Using_proxy: false,
+ Want_replicas: defaultReplicationLevel,
Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+ TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
+ Retries: 2,
}
- return kc, kc.DiscoverKeepServers()
+ return kc
}
// Put a block given the block hash, a reader, and the number of bytes
}
}
+func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+ if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") {
+ return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
+ }
+
+ var errs []string
+
+ tries_remaining := 1 + kc.Retries
+
+ serversToTry := kc.getSortedRoots(locator)
+
+ numServers := len(serversToTry)
+ count404 := 0
+
+ var retryList []string
+
+ for tries_remaining > 0 {
+ tries_remaining -= 1
+ retryList = nil
+
+ for _, host := range serversToTry {
+ url := host + "/" + locator
+
+ req, err := http.NewRequest(method, url, nil)
+ if err != nil {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ continue
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil {
+ // Probably a network error, may be transient,
+ // can try again.
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ retryList = append(retryList, host)
+ } else if resp.StatusCode != http.StatusOK {
+ var respbody []byte
+ respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
+ resp.Body.Close()
+ errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+ url, resp.StatusCode, bytes.TrimSpace(respbody)))
+
+ if resp.StatusCode == 408 ||
+ resp.StatusCode == 429 ||
+ resp.StatusCode >= 500 {
+ // Timeout, too many requests, or other
+ // server side failure, transient
+ // error, can try again.
+ retryList = append(retryList, host)
+ } else if resp.StatusCode == 404 {
+ count404++
+ }
+ } else {
+ // Success.
+ if method == "GET" {
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, resp.ContentLength, url, nil
+ } else {
+ resp.Body.Close()
+ return nil, resp.ContentLength, url, nil
+ }
+ }
+
+ }
+ serversToTry = retryList
+ }
+ DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
+
+ var err error
+ if count404 == numServers {
+ err = BlockNotFound
+ } else {
+ err = &ErrNotFound{multipleResponseError{
+ error: fmt.Errorf("%s %s failed: %v", method, locator, errs),
+ isTemp: len(serversToTry) > 0,
+ }}
+ }
+ return nil, 0, "", err
+}
+
// Get() retrieves a block, given a locator. Returns a reader, the
// expected data length, the URL the block is being fetched from, and
// an error.
// reader returned by this method will return a BadChecksum error
// instead of EOF.
func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
- var errs []string
- for _, host := range kc.getSortedRoots(locator) {
- url := host + "/" + locator
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- continue
- }
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
- if err != nil || resp.StatusCode != http.StatusOK {
- if resp != nil {
- var respbody []byte
- if resp.Body != nil {
- respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
- }
- errs = append(errs, fmt.Sprintf("%s: %d %s",
- url, resp.StatusCode, strings.TrimSpace(string(respbody))))
- } else {
- errs = append(errs, fmt.Sprintf("%s: %v", url, err))
- }
- continue
- }
- return HashCheckingReader{
- Reader: resp.Body,
- Hash: md5.New(),
- Check: locator[0:32],
- }, resp.ContentLength, url, nil
- }
- log.Printf("DEBUG: GET %s failed: %v", locator, errs)
- return nil, 0, "", BlockNotFound
+ return kc.getOrHead("GET", locator)
}
// Ask() verifies that a block with the given hash is available and
// Returns the data size (content length) reported by the Keep service
// and the URI reporting the data size.
func (kc *KeepClient) Ask(locator string) (int64, string, error) {
- for _, host := range kc.getSortedRoots(locator) {
- url := host + "/" + locator
- req, err := http.NewRequest("HEAD", url, nil)
- if err != nil {
- continue
- }
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
- return resp.ContentLength, url, nil
- }
+ _, size, url, err := kc.getOrHead("HEAD", locator)
+ return size, url, err
+}
+
+// GetIndex retrieves a list of blocks stored on the given server whose hashes
+// begin with the given prefix. The returned reader will return an error (other
+// than EOF) if the complete index cannot be retrieved.
+//
+// This is meant to be used only by system components and admin tools.
+// It will return an error unless the client is using a "data manager token"
+// recognized by the Keep services.
+func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
+ url := kc.LocalRoots()[keepServiceUUID]
+ if url == "" {
+ return nil, ErrNoSuchKeepServer
+ }
+
+ url += "/index"
+ if prefix != "" {
+ url += "/" + prefix
+ }
+
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode)
+ }
+
+ var respBody []byte
+ respBody, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ // Got index; verify that it is complete
+ // The response should be "\n" if no locators matched the prefix
+ // Else, it should be a list of locators followed by a blank line
+ if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) {
+ return nil, ErrIncompleteIndex
}
- return 0, "", BlockNotFound
+
+ // Got complete index; strip the trailing newline and send
+ return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
}
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// caller can reuse/modify them after SetServiceRoots returns, but
// they should not be modified by any other goroutine while
// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) {
+func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
locals := make(map[string]string)
for uuid, root := range newLocals {
locals[uuid] = root
return found
}
+func (kc *KeepClient) cache() *BlockCache {
+ if kc.BlockCache != nil {
+ return kc.BlockCache
+ } else {
+ return DefaultBlockCache
+ }
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known