20318: Fix DiskCacheSize not propagated by (*KeepClient)Clone().
[arvados.git] / sdk / go / keepclient / keepclient.go
index 8eff377ce651513cc6170848d18ea25cd260d5fb..3dc0aa0389158268af38861ec202574d8f20211c 100644 (file)
@@ -2,11 +2,13 @@
 //
 // SPDX-License-Identifier: Apache-2.0
 
-/* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
+// Package keepclient provides low-level Get/Put primitives for accessing
+// Arvados Keep blocks.
 package keepclient
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "errors"
        "fmt"
@@ -14,18 +16,20 @@ import (
        "io/ioutil"
        "net"
        "net/http"
+       "os"
+       "path/filepath"
        "regexp"
        "strconv"
        "strings"
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
-       "git.arvados.org/arvados.git/sdk/go/asyncbuf"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
-// A Keep "block" is 64MB.
+// BLOCKSIZE defines the length of a Keep "block", which is 64MB.
 const BLOCKSIZE = 64 * 1024 * 1024
 
 var (
@@ -38,6 +42,9 @@ var (
        DefaultProxyConnectTimeout      = 30 * time.Second
        DefaultProxyTLSHandshakeTimeout = 10 * time.Second
        DefaultProxyKeepAlive           = 120 * time.Second
+
+       rootCacheDir = "/var/cache/arvados/keep"
+       userCacheDir = ".cache/arvados/keep" // relative to HOME
 )
 
 // Error interface with an error and boolean indicating whether the error is temporary
@@ -67,11 +74,11 @@ type ErrNotFound struct {
        multipleResponseError
 }
 
-type InsufficientReplicasError error
+type InsufficientReplicasError struct{ error }
 
-type OversizeBlockError error
+type OversizeBlockError struct{ error }
 
-var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")"))
+var ErrOversizeBlock = OversizeBlockError{error: errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")}
 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 var InvalidLocatorError = errors.New("Invalid locator")
@@ -82,26 +89,33 @@ var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is
 // ErrIncompleteIndex is returned when the Index response does not end with a new empty line
 var ErrIncompleteIndex = errors.New("Got incomplete index")
 
-const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
-const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+const (
+       XKeepDesiredReplicas         = "X-Keep-Desired-Replicas"
+       XKeepReplicasStored          = "X-Keep-Replicas-Stored"
+       XKeepStorageClasses          = "X-Keep-Storage-Classes"
+       XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed"
+)
 
 type HTTPClient interface {
        Do(*http.Request) (*http.Response, error)
 }
 
-// Information about Arvados and Keep servers.
+const DiskCacheDisabled = arvados.ByteSizeOrPercent(1)
+
+// KeepClient holds information about Arvados and Keep servers.
 type KeepClient struct {
-       Arvados            *arvadosclient.ArvadosClient
-       Want_replicas      int
-       localRoots         map[string]string
-       writableLocalRoots map[string]string
-       gatewayRoots       map[string]string
-       lock               sync.RWMutex
-       HTTPClient         HTTPClient
-       Retries            int
-       BlockCache         *BlockCache
-       RequestID          string
-       StorageClasses     []string
+       Arvados               *arvadosclient.ArvadosClient
+       Want_replicas         int
+       localRoots            map[string]string
+       writableLocalRoots    map[string]string
+       gatewayRoots          map[string]string
+       lock                  sync.RWMutex
+       HTTPClient            HTTPClient
+       Retries               int
+       RequestID             string
+       StorageClasses        []string
+       DefaultStorageClasses []string                  // Set by cluster's exported config
+       DiskCacheSize         arvados.ByteSizeOrPercent // See also DiskCacheDisabled
 
        // set to 1 if all writable services are of disk type, otherwise 0
        replicasPerService int
@@ -111,9 +125,48 @@ type KeepClient struct {
 
        // Disable automatic discovery of keep services
        disableDiscovery bool
+
+       gatewayStack arvados.KeepGateway
+}
+
+func (kc *KeepClient) Clone() *KeepClient {
+       kc.lock.Lock()
+       defer kc.lock.Unlock()
+       return &KeepClient{
+               Arvados:               kc.Arvados,
+               Want_replicas:         kc.Want_replicas,
+               localRoots:            kc.localRoots,
+               writableLocalRoots:    kc.writableLocalRoots,
+               gatewayRoots:          kc.gatewayRoots,
+               HTTPClient:            kc.HTTPClient,
+               Retries:               kc.Retries,
+               RequestID:             kc.RequestID,
+               StorageClasses:        kc.StorageClasses,
+               DefaultStorageClasses: kc.DefaultStorageClasses,
+               DiskCacheSize:         kc.DiskCacheSize,
+               replicasPerService:    kc.replicasPerService,
+               foundNonDiskSvc:       kc.foundNonDiskSvc,
+               disableDiscovery:      kc.disableDiscovery,
+       }
 }
 
-// MakeKeepClient creates a new KeepClient, calls
+func (kc *KeepClient) loadDefaultClasses() error {
+       scData, err := kc.Arvados.ClusterConfig("StorageClasses")
+       if err != nil {
+               return err
+       }
+       classes := scData.(map[string]interface{})
+       for scName := range classes {
+               scConf, _ := classes[scName].(map[string]interface{})
+               isDefault, ok := scConf["Default"].(bool)
+               if ok && isDefault {
+                       kc.DefaultStorageClasses = append(kc.DefaultStorageClasses, scName)
+               }
+       }
+       return nil
+}
+
+// MakeKeepClient creates a new KeepClient, loads default storage classes, calls
 // DiscoverKeepServices(), and returns when the client is ready to
 // use.
 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
@@ -132,14 +185,19 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
                        defaultReplicationLevel = int(v)
                }
        }
-       return &KeepClient{
+       kc := &KeepClient{
                Arvados:       arv,
                Want_replicas: defaultReplicationLevel,
                Retries:       2,
        }
+       err = kc.loadDefaultClasses()
+       if err != nil {
+               DebugPrintf("DEBUG: Unable to load the default storage classes cluster config")
+       }
+       return kc
 }
 
-// Put a block given the block hash, a reader, and the number of bytes
+// PutHR puts a block given the block hash, a reader, and the number of bytes
 // to read from the reader (which must be between 0 and BLOCKSIZE).
 //
 // Returns the locator for the written block, the number of replicas
@@ -148,23 +206,12 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
 // Returns an InsufficientReplicasError if 0 <= replicas <
 // kc.Wants_replicas.
 func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
-       // Buffer for reads from 'r'
-       var bufsize int
-       if dataBytes > 0 {
-               if dataBytes > BLOCKSIZE {
-                       return "", 0, ErrOversizeBlock
-               }
-               bufsize = int(dataBytes)
-       } else {
-               bufsize = BLOCKSIZE
-       }
-
-       buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
-       go func() {
-               _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
-               buf.CloseWithError(err)
-       }()
-       return kc.putReplicas(hash, buf.NewReader, dataBytes)
+       resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+               Hash:     hash,
+               Reader:   r,
+               DataSize: int(dataBytes),
+       })
+       return resp.Locator, resp.Replicas, err
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -172,16 +219,21 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-       newReader := func() io.Reader { return bytes.NewBuffer(buf) }
-       return kc.putReplicas(hash, newReader, int64(len(buf)))
+       resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+               Hash: hash,
+               Data: buf,
+       })
+       return resp.Locator, resp.Replicas, err
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
-       hash := fmt.Sprintf("%x", md5.Sum(buffer))
-       return kc.PutHB(hash, buffer)
+       resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+               Data: buffer,
+       })
+       return resp.Locator, resp.Replicas, err
 }
 
 // PutR writes a block to Keep. It first reads all data from r into a buffer
@@ -191,11 +243,11 @@ func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
 //
 // If the block hash and data size are known, PutHR is more efficient.
 func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
-       if buffer, err := ioutil.ReadAll(r); err != nil {
+       buffer, err := ioutil.ReadAll(r)
+       if err != nil {
                return "", 0, err
-       } else {
-               return kc.PutB(buffer)
        }
+       return kc.PutB(buffer)
 }
 
 func (kc *KeepClient) getOrHead(method string, locator string, header http.Header) (io.ReadCloser, int64, string, http.Header, error) {
@@ -216,7 +268,7 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 
        var errs []string
 
-       tries_remaining := 1 + kc.Retries
+       triesRemaining := 1 + kc.Retries
 
        serversToTry := kc.getSortedRoots(locator)
 
@@ -225,8 +277,8 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 
        var retryList []string
 
-       for tries_remaining > 0 {
-               tries_remaining -= 1
+       for triesRemaining > 0 {
+               triesRemaining--
                retryList = nil
 
                for _, host := range serversToTry {
@@ -310,29 +362,53 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
        return nil, 0, "", nil, err
 }
 
+// attempt to create dir/subdir/ and its parents, up to but not
+// including dir itself, using mode 0700.
+func makedirs(dir, subdir string) {
+       for _, part := range strings.Split(subdir, string(os.PathSeparator)) {
+               dir = filepath.Join(dir, part)
+               os.Mkdir(dir, 0700)
+       }
+}
+
+// upstreamGateway creates/returns the KeepGateway stack used to read
+// and write data: a disk-backed cache on top of an http backend.
+func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
+       kc.lock.Lock()
+       defer kc.lock.Unlock()
+       if kc.gatewayStack != nil {
+               return kc.gatewayStack
+       }
+       var cachedir string
+       if os.Geteuid() == 0 {
+               cachedir = rootCacheDir
+               makedirs("/", cachedir)
+       } else {
+               home := "/" + os.Getenv("HOME")
+               makedirs(home, userCacheDir)
+               cachedir = filepath.Join(home, userCacheDir)
+       }
+       backend := &keepViaHTTP{kc}
+       if kc.DiskCacheSize == DiskCacheDisabled {
+               kc.gatewayStack = backend
+       } else {
+               kc.gatewayStack = &arvados.DiskCache{
+                       Dir:         cachedir,
+                       MaxSize:     kc.DiskCacheSize,
+                       KeepGateway: backend,
+               }
+       }
+       return kc.gatewayStack
+}
+
 // LocalLocator returns a locator equivalent to the one supplied, but
 // with a valid signature from the local cluster. If the given locator
 // already has a local signature, it is returned unchanged.
 func (kc *KeepClient) LocalLocator(locator string) (string, error) {
-       if !strings.Contains(locator, "+R") {
-               // Either it has +A, or it's unsigned and we assume
-               // it's a local locator on a site with signatures
-               // disabled.
-               return locator, nil
-       }
-       sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
-       _, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
-       if err != nil {
-               return "", err
-       }
-       loc := hdr.Get("X-Keep-Locator")
-       if loc == "" {
-               return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
-       }
-       return loc, nil
+       return kc.upstreamGateway().LocalLocator(locator)
 }
 
-// Get() retrieves a block, given a locator. Returns a reader, the
+// Get retrieves a block, given a locator. Returns a reader, the
 // expected data length, the URL the block is being fetched from, and
 // an error.
 //
@@ -344,13 +420,19 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
        return rdr, size, url, err
 }
 
-// ReadAt() retrieves a portion of block from the cache if it's
+// ReadAt retrieves a portion of block from the cache if it's
 // present, otherwise from the network.
 func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
-       return kc.cache().ReadAt(kc, locator, p, off)
+       return kc.upstreamGateway().ReadAt(locator, p, off)
 }
 
-// Ask() verifies that a block with the given hash is available and
+// BlockWrite writes a full block to upstream servers and saves a copy
+// in the local cache.
+func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+       return kc.upstreamGateway().BlockWrite(ctx, req)
+}
+
+// Ask verifies that a block with the given hash is available and
 // readable, according to at least one Keep service. Unlike Get, it
 // does not retrieve the data or verify that the data content matches
 // the hash specified by the locator.
@@ -415,7 +497,7 @@ func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error
        return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
 }
 
-// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
+// LocalRoots returns the map of local (i.e., disk and proxy) Keep
 // services: uuid -> baseURI.
 func (kc *KeepClient) LocalRoots() map[string]string {
        kc.discoverServices()
@@ -424,7 +506,7 @@ func (kc *KeepClient) LocalRoots() map[string]string {
        return kc.localRoots
 }
 
-// GatewayRoots() returns the map of Keep remote gateway services:
+// GatewayRoots returns the map of Keep remote gateway services:
 // uuid -> baseURI.
 func (kc *KeepClient) GatewayRoots() map[string]string {
        kc.discoverServices()
@@ -433,7 +515,7 @@ func (kc *KeepClient) GatewayRoots() map[string]string {
        return kc.gatewayRoots
 }
 
-// WritableLocalRoots() returns the map of writable local Keep services:
+// WritableLocalRoots returns the map of writable local Keep services:
 // uuid -> baseURI.
 func (kc *KeepClient) WritableLocalRoots() map[string]string {
        kc.discoverServices()
@@ -489,15 +571,9 @@ func (kc *KeepClient) getSortedRoots(locator string) []string {
        return found
 }
 
-func (kc *KeepClient) cache() *BlockCache {
-       if kc.BlockCache != nil {
-               return kc.BlockCache
-       }
-       return DefaultBlockCache
-}
-
-func (kc *KeepClient) ClearBlockCache() {
-       kc.cache().Clear()
+func (kc *KeepClient) SetStorageClasses(sc []string) {
+       // make a copy so the caller can't mess with it.
+       kc.StorageClasses = append([]string{}, sc...)
 }
 
 var (