20318: Route (*KeepClient)Get() through disk cache layer.
[arvados.git] / sdk / go / keepclient / keepclient.go
index 3dc0aa0389158268af38861ec202574d8f20211c..2bd7996b59c0260caf1d61560316c3bc42e09357 100644 (file)
@@ -7,6 +7,7 @@
 package keepclient
 
 import (
+       "bufio"
        "bytes"
        "context"
        "crypto/md5"
@@ -408,16 +409,65 @@ func (kc *KeepClient) LocalLocator(locator string) (string, error) {
        return kc.upstreamGateway().LocalLocator(locator)
 }
 
-// Get retrieves a block, given a locator. Returns a reader, the
-// expected data length, the URL the block is being fetched from, and
-// an error.
+// Get retrieves the specified block from the local cache or a backend
+// server. Returns a reader, the expected data length (or -1 if not
+// known), and an error.
+//
+// The third return value (formerly a source URL in previous versions)
+// is an empty string.
 //
 // If the block checksum does not match, the final Read() on the
 // reader returned by this method will return a BadChecksum error
 // instead of EOF.
+//
+// New code should use BlockRead and/or ReadAt instead of Get.
 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
-       rdr, size, url, _, err := kc.getOrHead("GET", locator, nil)
-       return rdr, size, url, err
+       loc, err := MakeLocator(locator)
+       if err != nil {
+               return nil, 0, "", err
+       }
+       pr, pw := io.Pipe()
+       go func() {
+               n, err := kc.BlockRead(context.Background(), arvados.BlockReadOptions{
+                       Locator: locator,
+                       WriteTo: pw,
+               })
+               if err != nil {
+                       pw.CloseWithError(err)
+               } else if loc.Size >= 0 && n != loc.Size {
+                       pw.CloseWithError(fmt.Errorf("expected block size %d but read %d bytes", loc.Size, n))
+               } else {
+                       pw.Close()
+               }
+       }()
+       // Wait for the first byte to arrive, so that, if there's an
+       // error before we receive any data, we can return the error
+       // directly, instead of indirectly via a reader that returns
+       // an error.
+       bufr := bufio.NewReader(pr)
+       _, err = bufr.Peek(1)
+       if err != nil && err != io.EOF {
+               pr.CloseWithError(err)
+               return nil, 0, "", err
+       }
+       if err == io.EOF && (loc.Size == 0 || loc.Hash == "d41d8cd98f00b204e9800998ecf8427e") {
+               // In the special case of the zero-length block, EOF
+               // error from Peek() is normal.
+               return pr, 0, "", nil
+       }
+       return struct {
+               io.Reader
+               io.Closer
+       }{
+               Reader: bufr,
+               Closer: pr,
+       }, int64(loc.Size), "", err
+}
+
+// BlockRead retrieves a block from the cache if it's present, otherwise
+// from the network.
+func (kc *KeepClient) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+       return kc.upstreamGateway().BlockRead(ctx, opts)
 }
 
 // ReadAt retrieves a portion of block from the cache if it's