+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
/* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
package keepclient
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
}
+ var expectLength int64
+ if parts := strings.SplitN(locator, "+", 3); len(parts) < 2 {
+ expectLength = -1
+ } else if n, err := strconv.ParseInt(parts[1], 10, 64); err != nil {
+ expectLength = -1
+ } else {
+ expectLength = n
+ }
+
var errs []string
tries_remaining := 1 + kc.Retries
// can try again.
errs = append(errs, fmt.Sprintf("%s: %v", url, err))
retryList = append(retryList, host)
- } else if resp.StatusCode != http.StatusOK {
+ continue
+ }
+ if resp.StatusCode != http.StatusOK {
var respbody []byte
respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
resp.Body.Close()
} else if resp.StatusCode == 404 {
count404++
}
- } else {
- // Success.
- if method == "GET" {
- return HashCheckingReader{
- Reader: resp.Body,
- Hash: md5.New(),
- Check: locator[0:32],
- }, resp.ContentLength, url, nil
- } else {
+ continue
+ }
+ if expectLength < 0 {
+ if resp.ContentLength < 0 {
resp.Body.Close()
- return nil, resp.ContentLength, url, nil
+ return nil, 0, "", fmt.Errorf("error reading %q: no size hint, no Content-Length header in response", locator)
}
+ expectLength = resp.ContentLength
+ } else if resp.ContentLength >= 0 && expectLength != resp.ContentLength {
+ resp.Body.Close()
+ return nil, 0, "", fmt.Errorf("error reading %q: size hint %d != Content-Length %d", locator, expectLength, resp.ContentLength)
+ }
+ // Success
+ if method == "GET" {
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, expectLength, url, nil
+ } else {
+ resp.Body.Close()
+ return nil, expectLength, url, nil
}
-
}
serversToTry = retryList
}
return kc.getOrHead("GET", locator)
}
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+ return kc.cache().ReadAt(kc, locator, p, off)
+}
+
// Ask() verifies that a block with the given hash is available and
// readable, according to at least one Keep service. Unlike Get, it
// does not retrieve the data or verify that the data content matches
}
}
+func (kc *KeepClient) ClearBlockCache() {
+ kc.cache().Clear()
+}
+
var (
// There are four global http.Client objects for the four
// possible permutations of TLS behavior (verify/skip-verify)