12876: Merge branch 'master' into 12876-arvados-client
[arvados.git] / sdk / go / keepclient / keepclient.go
index 029c6ee7f3a5834a8af149b64917aff2d5dc4bcb..54a4a374b991b44c5a5e51878be980a1b78f9609 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
 package keepclient
 
@@ -17,7 +21,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
+       "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
 )
 
 // A Keep "block" is 64MB.
@@ -152,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
                bufsize = BLOCKSIZE
        }
 
-       t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
-       defer t.Close()
-
-       return kc.putReplicas(hash, t, dataBytes)
+       buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+       go func() {
+               _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+               buf.CloseWithError(err)
+       }()
+       return kc.putReplicas(hash, buf.NewReader, dataBytes)
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -163,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-       t := streamer.AsyncStreamFromSlice(buf)
-       defer t.Close()
-       return kc.putReplicas(hash, t, int64(len(buf)))
+       newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+       return kc.putReplicas(hash, newReader, int64(len(buf)))
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
@@ -242,6 +247,10 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                                } else if resp.StatusCode == 404 {
                                        count404++
                                }
+                       } else if resp.ContentLength < 0 {
+                               // Missing Content-Length
+                               resp.Body.Close()
+                               return nil, 0, "", fmt.Errorf("Missing Content-Length of block")
                        } else {
                                // Success.
                                if method == "GET" {
@@ -284,6 +293,12 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
        return kc.getOrHead("GET", locator)
 }
 
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+       return kc.cache().ReadAt(kc, locator, p, off)
+}
+
 // Ask() verifies that a block with the given hash is available and
 // readable, according to at least one Keep service. Unlike Get, it
 // does not retrieve the data or verify that the data content matches
@@ -430,6 +445,10 @@ func (kc *KeepClient) cache() *BlockCache {
        }
 }
 
+func (kc *KeepClient) ClearBlockCache() {
+       kc.cache().Clear()
+}
+
 var (
        // There are four global http.Client objects for the four
        // possible permutations of TLS behavior (verify/skip-verify)