import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
"sync"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
- "git.arvados.org/arvados.git/sdk/go/asyncbuf"
"git.arvados.org/arvados.git/sdk/go/httpserver"
)
// ErrIncompleteIndex is returned when the Index response does not end with a new empty line
var ErrIncompleteIndex = errors.New("Got incomplete index")
-const XKeepDesiredReplicas = "X-Keep-Desired-Replicas"
-const XKeepReplicasStored = "X-Keep-Replicas-Stored"
+const (
+ XKeepDesiredReplicas = "X-Keep-Desired-Replicas"
+ XKeepReplicasStored = "X-Keep-Replicas-Stored"
+ XKeepStorageClasses = "X-Keep-Storage-Classes"
+ XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed"
+)
type HTTPClient interface {
Do(*http.Request) (*http.Response, error)
}
-// Information about Arvados and Keep servers.
+// KeepClient holds information about Arvados and Keep servers.
type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
// Returns an InsufficientReplicasError if 0 <= replicas <
// kc.Wants_replicas.
func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
- // Buffer for reads from 'r'
- var bufsize int
- if dataBytes > 0 {
- if dataBytes > BLOCKSIZE {
- return "", 0, ErrOversizeBlock
- }
- bufsize = int(dataBytes)
- } else {
- bufsize = BLOCKSIZE
- }
-
- buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
- go func() {
- _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
- buf.CloseWithError(err)
- }()
- return kc.putReplicas(hash, buf.NewReader, dataBytes)
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Hash: hash,
+ Reader: r,
+ DataSize: int(dataBytes),
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- newReader := func() io.Reader { return bytes.NewBuffer(buf) }
- return kc.putReplicas(hash, newReader, int64(len(buf)))
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Hash: hash,
+ Data: buf,
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutB writes a block to Keep. It computes the hash itself.
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
- hash := fmt.Sprintf("%x", md5.Sum(buffer))
- return kc.PutHB(hash, buffer)
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Data: buffer,
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutR writes a block to Keep. It first reads all data from r into a buffer
var retryList []string
for triesRemaining > 0 {
- triesRemaining -= 1
+ triesRemaining--
retryList = nil
for _, host := range serversToTry {
return loc, nil
}
-// Get() retrieves a block, given a locator. Returns a reader, the
+// Get retrieves a block, given a locator. Returns a reader, the
// expected data length, the URL the block is being fetched from, and
// an error.
//
return rdr, size, url, err
}
-// ReadAt() retrieves a portion of block from the cache if it's
+// ReadAt retrieves a portion of block from the cache if it's
// present, otherwise from the network.
func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
return kc.cache().ReadAt(kc, locator, p, off)
}
-// Ask() verifies that a block with the given hash is available and
+// Ask verifies that a block with the given hash is available and
// readable, according to at least one Keep service. Unlike Get, it
// does not retrieve the data or verify that the data content matches
// the hash specified by the locator.
return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
}
-// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
+// LocalRoots returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
kc.discoverServices()
return kc.localRoots
}
-// GatewayRoots() returns the map of Keep remote gateway services:
+// GatewayRoots returns the map of Keep remote gateway services:
// uuid -> baseURI.
func (kc *KeepClient) GatewayRoots() map[string]string {
kc.discoverServices()
return kc.gatewayRoots
}
-// WritableLocalRoots() returns the map of writable local Keep services:
+// WritableLocalRoots returns the map of writable local Keep services:
// uuid -> baseURI.
func (kc *KeepClient) WritableLocalRoots() map[string]string {
kc.discoverServices()
kc.cache().Clear()
}
+func (kc *KeepClient) SetStorageClasses(sc []string) {
+ // make a copy so the caller can't mess with it.
+ kc.StorageClasses = append([]string{}, sc...)
+}
+
var (
// There are four global http.Client objects for the four
// possible permutations of TLS behavior (verify/skip-verify)