X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a7a482db3954fa6470be74f0e00f6e1e105e0b6c..3583e494ed815632bbaa2582fd0a49110a21123b:/sdk/go/keepclient/keepclient.go?ds=sidebyside diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index b18d7e0464..3dc0aa0389 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -2,11 +2,13 @@ // // SPDX-License-Identifier: Apache-2.0 -/* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */ +// Package keepclient provides low-level Get/Put primitives for accessing +// Arvados Keep blocks. package keepclient import ( "bytes" + "context" "crypto/md5" "errors" "fmt" @@ -14,18 +16,20 @@ import ( "io/ioutil" "net" "net/http" + "os" + "path/filepath" "regexp" "strconv" "strings" "sync" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" - "git.arvados.org/arvados.git/sdk/go/asyncbuf" "git.arvados.org/arvados.git/sdk/go/httpserver" ) -// A Keep "block" is 64MB. +// BLOCKSIZE defines the length of a Keep "block", which is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 var ( @@ -38,6 +42,9 @@ var ( DefaultProxyConnectTimeout = 30 * time.Second DefaultProxyTLSHandshakeTimeout = 10 * time.Second DefaultProxyKeepAlive = 120 * time.Second + + rootCacheDir = "/var/cache/arvados/keep" + userCacheDir = ".cache/arvados/keep" // relative to HOME ) // Error interface with an error and boolean indicating whether the error is temporary @@ -67,11 +74,11 @@ type ErrNotFound struct { multipleResponseError } -type InsufficientReplicasError error +type InsufficientReplicasError struct{ error } -type OversizeBlockError error +type OversizeBlockError struct{ error } -var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")) +var ErrOversizeBlock = OversizeBlockError{error: errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")} var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") var InvalidLocatorError = errors.New("Invalid locator") @@ -82,26 +89,33 @@ var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is // ErrIncompleteIndex is returned when the Index response does not end with a new empty line var ErrIncompleteIndex = errors.New("Got incomplete index") -const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" -const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" +const ( + XKeepDesiredReplicas = "X-Keep-Desired-Replicas" + XKeepReplicasStored = "X-Keep-Replicas-Stored" + XKeepStorageClasses = "X-Keep-Storage-Classes" + XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed" +) type HTTPClient interface { Do(*http.Request) (*http.Response, error) } -// Information about Arvados and Keep servers. +const DiskCacheDisabled = arvados.ByteSizeOrPercent(1) + +// KeepClient holds information about Arvados and Keep servers. type KeepClient struct { - Arvados *arvadosclient.ArvadosClient - Want_replicas int - localRoots map[string]string - writableLocalRoots map[string]string - gatewayRoots map[string]string - lock sync.RWMutex - HTTPClient HTTPClient - Retries int - BlockCache *BlockCache - RequestID string - StorageClasses []string + Arvados *arvadosclient.ArvadosClient + Want_replicas int + localRoots map[string]string + writableLocalRoots map[string]string + gatewayRoots map[string]string + lock sync.RWMutex + HTTPClient HTTPClient + Retries int + RequestID string + StorageClasses []string + DefaultStorageClasses []string // Set by cluster's exported config + DiskCacheSize arvados.ByteSizeOrPercent // See also DiskCacheDisabled // set to 1 if all writable services are of disk type, otherwise 0 replicasPerService int @@ -111,9 +125,48 @@ type KeepClient struct { // Disable automatic discovery of keep services disableDiscovery bool + + gatewayStack arvados.KeepGateway } -// MakeKeepClient creates a new KeepClient, calls +func (kc *KeepClient) Clone() *KeepClient { + kc.lock.Lock() + defer kc.lock.Unlock() + return &KeepClient{ + Arvados: kc.Arvados, + Want_replicas: kc.Want_replicas, + localRoots: kc.localRoots, + writableLocalRoots: kc.writableLocalRoots, + gatewayRoots: kc.gatewayRoots, + HTTPClient: kc.HTTPClient, + Retries: kc.Retries, + RequestID: kc.RequestID, + StorageClasses: kc.StorageClasses, + DefaultStorageClasses: kc.DefaultStorageClasses, + DiskCacheSize: kc.DiskCacheSize, + replicasPerService: kc.replicasPerService, + foundNonDiskSvc: kc.foundNonDiskSvc, + disableDiscovery: kc.disableDiscovery, + } +} + +func (kc *KeepClient) loadDefaultClasses() error { + scData, err := kc.Arvados.ClusterConfig("StorageClasses") + if err != nil { + return err + } + classes := scData.(map[string]interface{}) + for scName := range classes { + scConf, _ := classes[scName].(map[string]interface{}) + isDefault, ok := scConf["Default"].(bool) + if ok && isDefault { + kc.DefaultStorageClasses = append(kc.DefaultStorageClasses, scName) + } + } + return nil +} + +// MakeKeepClient creates a new KeepClient, loads default storage classes, calls // DiscoverKeepServices(), and returns when the client is ready to // use. func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { @@ -132,14 +185,19 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { defaultReplicationLevel = int(v) } } - return &KeepClient{ + kc := &KeepClient{ Arvados: arv, Want_replicas: defaultReplicationLevel, Retries: 2, } + err = kc.loadDefaultClasses() + if err != nil { + DebugPrintf("DEBUG: Unable to load the default storage classes cluster config") + } + return kc } -// Put a block given the block hash, a reader, and the number of bytes +// PutHR puts a block given the block hash, a reader, and the number of bytes // to read from the reader (which must be between 0 and BLOCKSIZE). // // Returns the locator for the written block, the number of replicas @@ -148,23 +206,12 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { // Returns an InsufficientReplicasError if 0 <= replicas < // kc.Wants_replicas. func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) { - // Buffer for reads from 'r' - var bufsize int - if dataBytes > 0 { - if dataBytes > BLOCKSIZE { - return "", 0, ErrOversizeBlock - } - bufsize = int(dataBytes) - } else { - bufsize = BLOCKSIZE - } - - buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize)) - go func() { - _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash}) - buf.CloseWithError(err) - }() - return kc.putReplicas(hash, buf.NewReader, dataBytes) + resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Hash: hash, + Reader: r, + DataSize: int(dataBytes), + }) + return resp.Locator, resp.Replicas, err } // PutHB writes a block to Keep. The hash of the bytes is given in @@ -172,16 +219,21 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, // // Return values are the same as for PutHR. func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) { - newReader := func() io.Reader { return bytes.NewBuffer(buf) } - return kc.putReplicas(hash, newReader, int64(len(buf))) + resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Hash: hash, + Data: buf, + }) + return resp.Locator, resp.Replicas, err } // PutB writes a block to Keep. It computes the hash itself. // // Return values are the same as for PutHR. func (kc *KeepClient) PutB(buffer []byte) (string, int, error) { - hash := fmt.Sprintf("%x", md5.Sum(buffer)) - return kc.PutHB(hash, buffer) + resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: buffer, + }) + return resp.Locator, resp.Replicas, err } // PutR writes a block to Keep. It first reads all data from r into a buffer @@ -191,11 +243,11 @@ func (kc *KeepClient) PutB(buffer []byte) (string, int, error) { // // If the block hash and data size are known, PutHR is more efficient. func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) { - if buffer, err := ioutil.ReadAll(r); err != nil { + buffer, err := ioutil.ReadAll(r) + if err != nil { return "", 0, err - } else { - return kc.PutB(buffer) } + return kc.PutB(buffer) } func (kc *KeepClient) getOrHead(method string, locator string, header http.Header) (io.ReadCloser, int64, string, http.Header, error) { @@ -216,7 +268,7 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade var errs []string - tries_remaining := 1 + kc.Retries + triesRemaining := 1 + kc.Retries serversToTry := kc.getSortedRoots(locator) @@ -225,8 +277,8 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade var retryList []string - for tries_remaining > 0 { - tries_remaining -= 1 + for triesRemaining > 0 { + triesRemaining-- retryList = nil for _, host := range serversToTry { @@ -290,10 +342,9 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade Hash: md5.New(), Check: locator[0:32], }, expectLength, url, resp.Header, nil - } else { - resp.Body.Close() - return nil, expectLength, url, resp.Header, nil } + resp.Body.Close() + return nil, expectLength, url, resp.Header, nil } serversToTry = retryList } @@ -311,29 +362,53 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade return nil, 0, "", nil, err } +// attempt to create dir/subdir/ and its parents, up to but not +// including dir itself, using mode 0700. +func makedirs(dir, subdir string) { + for _, part := range strings.Split(subdir, string(os.PathSeparator)) { + dir = filepath.Join(dir, part) + os.Mkdir(dir, 0700) + } +} + +// upstreamGateway creates/returns the KeepGateway stack used to read +// and write data: a disk-backed cache on top of an http backend. +func (kc *KeepClient) upstreamGateway() arvados.KeepGateway { + kc.lock.Lock() + defer kc.lock.Unlock() + if kc.gatewayStack != nil { + return kc.gatewayStack + } + var cachedir string + if os.Geteuid() == 0 { + cachedir = rootCacheDir + makedirs("/", cachedir) + } else { + home := "/" + os.Getenv("HOME") + makedirs(home, userCacheDir) + cachedir = filepath.Join(home, userCacheDir) + } + backend := &keepViaHTTP{kc} + if kc.DiskCacheSize == DiskCacheDisabled { + kc.gatewayStack = backend + } else { + kc.gatewayStack = &arvados.DiskCache{ + Dir: cachedir, + MaxSize: kc.DiskCacheSize, + KeepGateway: backend, + } + } + return kc.gatewayStack +} + // LocalLocator returns a locator equivalent to the one supplied, but // with a valid signature from the local cluster. If the given locator // already has a local signature, it is returned unchanged. func (kc *KeepClient) LocalLocator(locator string) (string, error) { - if !strings.Contains(locator, "+R") { - // Either it has +A, or it's unsigned and we assume - // it's a local locator on a site with signatures - // disabled. - return locator, nil - } - sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339)) - _, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}}) - if err != nil { - return "", err - } - loc := hdr.Get("X-Keep-Locator") - if loc == "" { - return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url) - } - return loc, nil + return kc.upstreamGateway().LocalLocator(locator) } -// Get() retrieves a block, given a locator. Returns a reader, the +// Get retrieves a block, given a locator. Returns a reader, the // expected data length, the URL the block is being fetched from, and // an error. // @@ -345,13 +420,19 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) return rdr, size, url, err } -// ReadAt() retrieves a portion of block from the cache if it's +// ReadAt retrieves a portion of block from the cache if it's // present, otherwise from the network. func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) { - return kc.cache().ReadAt(kc, locator, p, off) + return kc.upstreamGateway().ReadAt(locator, p, off) +} + +// BlockWrite writes a full block to upstream servers and saves a copy +// in the local cache. +func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) { + return kc.upstreamGateway().BlockWrite(ctx, req) } -// Ask() verifies that a block with the given hash is available and +// Ask verifies that a block with the given hash is available and // readable, according to at least one Keep service. Unlike Get, it // does not retrieve the data or verify that the data content matches // the hash specified by the locator. @@ -416,7 +497,7 @@ func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error return bytes.NewReader(respBody[0 : len(respBody)-1]), nil } -// LocalRoots() returns the map of local (i.e., disk and proxy) Keep +// LocalRoots returns the map of local (i.e., disk and proxy) Keep // services: uuid -> baseURI. func (kc *KeepClient) LocalRoots() map[string]string { kc.discoverServices() @@ -425,7 +506,7 @@ func (kc *KeepClient) LocalRoots() map[string]string { return kc.localRoots } -// GatewayRoots() returns the map of Keep remote gateway services: +// GatewayRoots returns the map of Keep remote gateway services: // uuid -> baseURI. func (kc *KeepClient) GatewayRoots() map[string]string { kc.discoverServices() @@ -434,7 +515,7 @@ func (kc *KeepClient) GatewayRoots() map[string]string { return kc.gatewayRoots } -// WritableLocalRoots() returns the map of writable local Keep services: +// WritableLocalRoots returns the map of writable local Keep services: // uuid -> baseURI. func (kc *KeepClient) WritableLocalRoots() map[string]string { kc.discoverServices() @@ -490,16 +571,9 @@ func (kc *KeepClient) getSortedRoots(locator string) []string { return found } -func (kc *KeepClient) cache() *BlockCache { - if kc.BlockCache != nil { - return kc.BlockCache - } else { - return DefaultBlockCache - } -} - -func (kc *KeepClient) ClearBlockCache() { - kc.cache().Clear() +func (kc *KeepClient) SetStorageClasses(sc []string) { + // make a copy so the caller can't mess with it. + kc.StorageClasses = append([]string{}, sc...) } var ( @@ -576,9 +650,8 @@ var reqIDGen = httpserver.IDGenerator{Prefix: "req-"} func (kc *KeepClient) getRequestID() string { if kc.RequestID != "" { return kc.RequestID - } else { - return reqIDGen.Next() } + return reqIDGen.Next() } type Locator struct {