2960: Refactor keepstore into a streaming server.
[arvados.git] / sdk / go / keepclient / keepclient.go
index 2712e9f3a580ef610615a5f88cdde47a11ae8469..64f7e47b7e14e85aa86d369da58c6fd8d1273ad7 100644 (file)
@@ -7,6 +7,7 @@
 package keepclient
 
 import (
+       "bufio"
        "bytes"
        "context"
        "crypto/md5"
@@ -74,6 +75,8 @@ type ErrNotFound struct {
        multipleResponseError
 }
 
+func (*ErrNotFound) HTTPStatus() int { return http.StatusNotFound }
+
 type InsufficientReplicasError struct{ error }
 
 type OversizeBlockError struct{ error }
@@ -100,6 +103,8 @@ type HTTPClient interface {
        Do(*http.Request) (*http.Response, error)
 }
 
+const DiskCacheDisabled = arvados.ByteSizeOrPercent(1)
+
 // KeepClient holds information about Arvados and Keep servers.
 type KeepClient struct {
        Arvados               *arvadosclient.ArvadosClient
@@ -110,10 +115,10 @@ type KeepClient struct {
        lock                  sync.RWMutex
        HTTPClient            HTTPClient
        Retries               int
-       BlockCache            *BlockCache
        RequestID             string
        StorageClasses        []string
-       DefaultStorageClasses []string // Set by cluster's exported config
+       DefaultStorageClasses []string                  // Set by cluster's exported config
+       DiskCacheSize         arvados.ByteSizeOrPercent // See also DiskCacheDisabled
 
        // set to 1 if all writable services are of disk type, otherwise 0
        replicasPerService int
@@ -127,6 +132,27 @@ type KeepClient struct {
        gatewayStack arvados.KeepGateway
 }
 
+func (kc *KeepClient) Clone() *KeepClient {
+       kc.lock.Lock()
+       defer kc.lock.Unlock()
+       return &KeepClient{
+               Arvados:               kc.Arvados,
+               Want_replicas:         kc.Want_replicas,
+               localRoots:            kc.localRoots,
+               writableLocalRoots:    kc.writableLocalRoots,
+               gatewayRoots:          kc.gatewayRoots,
+               HTTPClient:            kc.HTTPClient,
+               Retries:               kc.Retries,
+               RequestID:             kc.RequestID,
+               StorageClasses:        kc.StorageClasses,
+               DefaultStorageClasses: kc.DefaultStorageClasses,
+               DiskCacheSize:         kc.DiskCacheSize,
+               replicasPerService:    kc.replicasPerService,
+               foundNonDiskSvc:       kc.foundNonDiskSvc,
+               disableDiscovery:      kc.disableDiscovery,
+       }
+}
+
 func (kc *KeepClient) loadDefaultClasses() error {
        scData, err := kc.Arvados.ClusterConfig("StorageClasses")
        if err != nil {
@@ -365,9 +391,15 @@ func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
                makedirs(home, userCacheDir)
                cachedir = filepath.Join(home, userCacheDir)
        }
-       kc.gatewayStack = &arvados.DiskCache{
-               Dir:         cachedir,
-               KeepGateway: &keepViaHTTP{kc},
+       backend := &keepViaHTTP{kc}
+       if kc.DiskCacheSize == DiskCacheDisabled {
+               kc.gatewayStack = backend
+       } else {
+               kc.gatewayStack = &arvados.DiskCache{
+                       Dir:         cachedir,
+                       MaxSize:     kc.DiskCacheSize,
+                       KeepGateway: backend,
+               }
        }
        return kc.gatewayStack
 }
@@ -379,16 +411,65 @@ func (kc *KeepClient) LocalLocator(locator string) (string, error) {
        return kc.upstreamGateway().LocalLocator(locator)
 }
 
-// Get retrieves a block, given a locator. Returns a reader, the
-// expected data length, the URL the block is being fetched from, and
-// an error.
+// Get retrieves the specified block from the local cache or a backend
+// server. Returns a reader, the expected data length (or -1 if not
+// known), and an error.
+//
+// The third return value (formerly a source URL in previous versions)
+// is an empty string.
 //
 // If the block checksum does not match, the final Read() on the
 // reader returned by this method will return a BadChecksum error
 // instead of EOF.
+//
+// New code should use BlockRead and/or ReadAt instead of Get.
 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
-       rdr, size, url, _, err := kc.getOrHead("GET", locator, nil)
-       return rdr, size, url, err
+       loc, err := MakeLocator(locator)
+       if err != nil {
+               return nil, 0, "", err
+       }
+       pr, pw := io.Pipe()
+       go func() {
+               n, err := kc.BlockRead(context.Background(), arvados.BlockReadOptions{
+                       Locator: locator,
+                       WriteTo: pw,
+               })
+               if err != nil {
+                       pw.CloseWithError(err)
+               } else if loc.Size >= 0 && n != loc.Size {
+                       pw.CloseWithError(fmt.Errorf("expected block size %d but read %d bytes", loc.Size, n))
+               } else {
+                       pw.Close()
+               }
+       }()
+       // Wait for the first byte to arrive, so that, if there's an
+       // error before we receive any data, we can return the error
+       // directly, instead of indirectly via a reader that returns
+       // an error.
+       bufr := bufio.NewReader(pr)
+       _, err = bufr.Peek(1)
+       if err != nil && err != io.EOF {
+               pr.CloseWithError(err)
+               return nil, 0, "", err
+       }
+       if err == io.EOF && (loc.Size == 0 || loc.Hash == "d41d8cd98f00b204e9800998ecf8427e") {
+               // In the special case of the zero-length block, EOF
+               // error from Peek() is normal.
+               return pr, 0, "", nil
+       }
+       return struct {
+               io.Reader
+               io.Closer
+       }{
+               Reader: bufr,
+               Closer: pr,
+       }, int64(loc.Size), "", err
+}
+
+// BlockRead retrieves a block from the cache if it's present, otherwise
+// from the network.
+func (kc *KeepClient) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+       return kc.upstreamGateway().BlockRead(ctx, opts)
 }
 
 // ReadAt retrieves a portion of block from the cache if it's
@@ -542,17 +623,6 @@ func (kc *KeepClient) getSortedRoots(locator string) []string {
        return found
 }
 
-func (kc *KeepClient) cache() *BlockCache {
-       if kc.BlockCache != nil {
-               return kc.BlockCache
-       }
-       return DefaultBlockCache
-}
-
-func (kc *KeepClient) ClearBlockCache() {
-       kc.cache().Clear()
-}
-
 func (kc *KeepClient) SetStorageClasses(sc []string) {
        // make a copy so the caller can't mess with it.
        kc.StorageClasses = append([]string{}, sc...)