import (
"bytes"
"crypto/md5"
- "crypto/tls"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
- "log"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// A Keep "block" is 64MB.
multipleResponseError
}
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
+type InsufficientReplicasError error
+
+type OversizeBlockError error
+
+var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")"))
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+type HTTPClient interface {
+ Do(*http.Request) (*http.Response, error)
+}
+
// Information about Arvados and Keep servers.
type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
- Using_proxy bool
localRoots *map[string]string
writableLocalRoots *map[string]string
gatewayRoots *map[string]string
lock sync.RWMutex
- Client *http.Client
+ Client HTTPClient
Retries int
+ BlockCache *BlockCache
// set to 1 if all writable services are of disk type, otherwise 0
replicasPerService int
+
+ // Any non-disk typed services found in the list of keepservers?
+ foundNonDiskSvc bool
}
// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
kc := &KeepClient{
Arvados: arv,
Want_replicas: defaultReplicationLevel,
- Using_proxy: false,
Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}},
+ TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
Retries: 2,
}
return kc
// Returns the locator for the written block, the number of replicas
// written, and an error.
//
-// Returns an InsufficientReplicas error if 0 <= replicas <
+// Returns an InsufficientReplicasError if 0 <= replicas <
// kc.Wants_replicas.
func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
// Buffer for reads from 'r'
var bufsize int
if dataBytes > 0 {
if dataBytes > BLOCKSIZE {
- return "", 0, OversizeBlockError
+ return "", 0, ErrOversizeBlock
}
bufsize = int(dataBytes)
} else {
}
func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+ if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") {
+ return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
+ }
+
var errs []string
- var count404 int
tries_remaining := 1 + kc.Retries
+
serversToTry := kc.getSortedRoots(locator)
+
+ numServers := len(serversToTry)
+ count404 := 0
+
var retryList []string
for tries_remaining > 0 {
retryList = append(retryList, host)
} else if resp.StatusCode != http.StatusOK {
var respbody []byte
- respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
resp.Body.Close()
errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
url, resp.StatusCode, bytes.TrimSpace(respbody)))
}
serversToTry = retryList
}
- log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
+ DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
var err error
- if count404 == len(kc.getSortedRoots(locator)) {
+ if count404 == numServers {
err = BlockNotFound
} else {
err = &ErrNotFound{multipleResponseError{
// caller can reuse/modify them after SetServiceRoots returns, but
// they should not be modified by any other goroutine while
// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) {
+func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
locals := make(map[string]string)
for uuid, root := range newLocals {
locals[uuid] = root
return found
}
+func (kc *KeepClient) cache() *BlockCache {
+ if kc.BlockCache != nil {
+ return kc.BlockCache
+ } else {
+ return DefaultBlockCache
+ }
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known