X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0f5b0542513b572959e39400bae42e69aeb1a7b6..refs/heads/21766-disk-cache-size:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 68ac886ddd..1c72e583cb 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -7,6 +7,7 @@ package keepclient import ( + "bufio" "bytes" "context" "crypto/md5" @@ -16,6 +17,8 @@ import ( "io/ioutil" "net" "net/http" + "os" + "path/filepath" "regexp" "strconv" "strings" @@ -40,6 +43,12 @@ var ( DefaultProxyConnectTimeout = 30 * time.Second DefaultProxyTLSHandshakeTimeout = 10 * time.Second DefaultProxyKeepAlive = 120 * time.Second + + DefaultRetryDelay = 2 * time.Second // see KeepClient.RetryDelay + MinimumRetryDelay = time.Millisecond + + rootCacheDir = "/var/cache/arvados/keep" + userCacheDir = ".cache/arvados/keep" // relative to HOME ) // Error interface with an error and boolean indicating whether the error is temporary @@ -69,6 +78,8 @@ type ErrNotFound struct { multipleResponseError } +func (*ErrNotFound) HTTPStatus() int { return http.StatusNotFound } + type InsufficientReplicasError struct{ error } type OversizeBlockError struct{ error } @@ -89,26 +100,41 @@ const ( XKeepReplicasStored = "X-Keep-Replicas-Stored" XKeepStorageClasses = "X-Keep-Storage-Classes" XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed" + XKeepSignature = "X-Keep-Signature" + XKeepLocator = "X-Keep-Locator" ) type HTTPClient interface { Do(*http.Request) (*http.Response, error) } +const DiskCacheDisabled = arvados.ByteSizeOrPercent(1) + // KeepClient holds information about Arvados and Keep servers. type KeepClient struct { - Arvados *arvadosclient.ArvadosClient - Want_replicas int - localRoots map[string]string - writableLocalRoots map[string]string - gatewayRoots map[string]string - lock sync.RWMutex - HTTPClient HTTPClient - Retries int - BlockCache *BlockCache + Arvados *arvadosclient.ArvadosClient + Want_replicas int + localRoots map[string]string + writableLocalRoots map[string]string + gatewayRoots map[string]string + lock sync.RWMutex + HTTPClient HTTPClient + + // Number of times to automatically retry a read/write + // operation after a transient failure. + Retries int + + // Initial maximum delay for automatic retry. If zero, + // DefaultRetryDelay is used. The delay after attempt N + // (0-based) will be a random duration between + // MinimumRetryDelay and RetryDelay * 2^N, not to exceed a cap + // of RetryDelay * 10. + RetryDelay time.Duration + RequestID string StorageClasses []string - DefaultStorageClasses []string // Set by cluster's exported config + DefaultStorageClasses []string // Set by cluster's exported config + DiskCacheSize arvados.ByteSizeOrPercent // See also DiskCacheDisabled // set to 1 if all writable services are of disk type, otherwise 0 replicasPerService int @@ -118,6 +144,30 @@ type KeepClient struct { // Disable automatic discovery of keep services disableDiscovery bool + + gatewayStack arvados.KeepGateway +} + +func (kc *KeepClient) Clone() *KeepClient { + kc.lock.Lock() + defer kc.lock.Unlock() + return &KeepClient{ + Arvados: kc.Arvados, + Want_replicas: kc.Want_replicas, + localRoots: kc.localRoots, + writableLocalRoots: kc.writableLocalRoots, + gatewayRoots: kc.gatewayRoots, + HTTPClient: kc.HTTPClient, + Retries: kc.Retries, + RetryDelay: kc.RetryDelay, + RequestID: kc.RequestID, + StorageClasses: kc.StorageClasses, + DefaultStorageClasses: kc.DefaultStorageClasses, + DiskCacheSize: kc.DiskCacheSize, + replicasPerService: kc.replicasPerService, + foundNonDiskSvc: kc.foundNonDiskSvc, + disableDiscovery: kc.disableDiscovery, + } } func (kc *KeepClient) loadDefaultClasses() error { @@ -161,8 +211,8 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { Retries: 2, } err = kc.loadDefaultClasses() - if err != nil { - DebugPrintf("DEBUG: Unable to load the default storage classes cluster config") + if err != nil && arv.Logger != nil { + arv.Logger.WithError(err).Debug("unable to load the default storage classes cluster config") } return kc } @@ -238,6 +288,7 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade var errs []string + delay := delayCalculator{InitialMaxDelay: kc.RetryDelay} triesRemaining := 1 + kc.Retries serversToTry := kc.getSortedRoots(locator) @@ -317,8 +368,13 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade return nil, expectLength, url, resp.Header, nil } serversToTry = retryList + if len(serversToTry) > 0 && triesRemaining > 0 { + time.Sleep(delay.Next()) + } + } + if kc.Arvados.Logger != nil { + kc.Arvados.Logger.Debugf("DEBUG: %s %s failed: %v", method, locator, errs) } - DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs) var err error if count404 == numServers { @@ -332,44 +388,124 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade return nil, 0, "", nil, err } +// attempt to create dir/subdir/ and its parents, up to but not +// including dir itself, using mode 0700. +func makedirs(dir, subdir string) { + for _, part := range strings.Split(subdir, string(os.PathSeparator)) { + dir = filepath.Join(dir, part) + os.Mkdir(dir, 0700) + } +} + +// upstreamGateway creates/returns the KeepGateway stack used to read +// and write data: a disk-backed cache on top of an http backend. +func (kc *KeepClient) upstreamGateway() arvados.KeepGateway { + kc.lock.Lock() + defer kc.lock.Unlock() + if kc.gatewayStack != nil { + return kc.gatewayStack + } + var cachedir string + if os.Geteuid() == 0 { + cachedir = rootCacheDir + makedirs("/", cachedir) + } else { + home := "/" + os.Getenv("HOME") + makedirs(home, userCacheDir) + cachedir = filepath.Join(home, userCacheDir) + } + backend := &keepViaHTTP{kc} + if kc.DiskCacheSize == DiskCacheDisabled { + kc.gatewayStack = backend + } else { + kc.gatewayStack = &arvados.DiskCache{ + Dir: cachedir, + MaxSize: kc.DiskCacheSize, + KeepGateway: backend, + Logger: kc.Arvados.Logger, + } + } + return kc.gatewayStack +} + // LocalLocator returns a locator equivalent to the one supplied, but // with a valid signature from the local cluster. If the given locator // already has a local signature, it is returned unchanged. func (kc *KeepClient) LocalLocator(locator string) (string, error) { - if !strings.Contains(locator, "+R") { - // Either it has +A, or it's unsigned and we assume - // it's a local locator on a site with signatures - // disabled. - return locator, nil - } - sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339)) - _, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}}) - if err != nil { - return "", err - } - loc := hdr.Get("X-Keep-Locator") - if loc == "" { - return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url) - } - return loc, nil + return kc.upstreamGateway().LocalLocator(locator) } -// Get retrieves a block, given a locator. Returns a reader, the -// expected data length, the URL the block is being fetched from, and -// an error. +// Get retrieves the specified block from the local cache or a backend +// server. Returns a reader, the expected data length (or -1 if not +// known), and an error. +// +// The third return value (formerly a source URL in previous versions) +// is an empty string. // // If the block checksum does not match, the final Read() on the // reader returned by this method will return a BadChecksum error // instead of EOF. +// +// New code should use BlockRead and/or ReadAt instead of Get. func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { - rdr, size, url, _, err := kc.getOrHead("GET", locator, nil) - return rdr, size, url, err + loc, err := MakeLocator(locator) + if err != nil { + return nil, 0, "", err + } + pr, pw := io.Pipe() + go func() { + n, err := kc.BlockRead(context.Background(), arvados.BlockReadOptions{ + Locator: locator, + WriteTo: pw, + }) + if err != nil { + pw.CloseWithError(err) + } else if loc.Size >= 0 && n != loc.Size { + pw.CloseWithError(fmt.Errorf("expected block size %d but read %d bytes", loc.Size, n)) + } else { + pw.Close() + } + }() + // Wait for the first byte to arrive, so that, if there's an + // error before we receive any data, we can return the error + // directly, instead of indirectly via a reader that returns + // an error. + bufr := bufio.NewReader(pr) + _, err = bufr.Peek(1) + if err != nil && err != io.EOF { + pr.CloseWithError(err) + return nil, 0, "", err + } + if err == io.EOF && (loc.Size == 0 || loc.Hash == "d41d8cd98f00b204e9800998ecf8427e") { + // In the special case of the zero-length block, EOF + // error from Peek() is normal. + return pr, 0, "", nil + } + return struct { + io.Reader + io.Closer + }{ + Reader: bufr, + Closer: pr, + }, int64(loc.Size), "", err +} + +// BlockRead retrieves a block from the cache if it's present, otherwise +// from the network. +func (kc *KeepClient) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) { + return kc.upstreamGateway().BlockRead(ctx, opts) } // ReadAt retrieves a portion of block from the cache if it's // present, otherwise from the network. func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) { - return kc.cache().ReadAt(kc, locator, p, off) + return kc.upstreamGateway().ReadAt(locator, p, off) +} + +// BlockWrite writes a full block to upstream servers and saves a copy +// in the local cache. +func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) { + return kc.upstreamGateway().BlockWrite(ctx, req) } // Ask verifies that a block with the given hash is available and @@ -511,17 +647,6 @@ func (kc *KeepClient) getSortedRoots(locator string) []string { return found } -func (kc *KeepClient) cache() *BlockCache { - if kc.BlockCache != nil { - return kc.BlockCache - } - return DefaultBlockCache -} - -func (kc *KeepClient) ClearBlockCache() { - kc.cache().Clear() -} - func (kc *KeepClient) SetStorageClasses(sc []string) { // make a copy so the caller can't mess with it. kc.StorageClasses = append([]string{}, sc...) @@ -605,6 +730,13 @@ func (kc *KeepClient) getRequestID() string { return reqIDGen.Next() } +func (kc *KeepClient) debugf(format string, args ...interface{}) { + if kc.Arvados.Logger == nil { + return + } + kc.Arvados.Logger.Debugf(format, args...) +} + type Locator struct { Hash string Size int // -1 if data size is not known