20318: Remove memory-backed keep block cache.
authorTom Clegg <tom@curii.com>
Tue, 19 Dec 2023 19:07:23 +0000 (14:07 -0500)
committerTom Clegg <tom@curii.com>
Tue, 19 Dec 2023 19:07:23 +0000 (14:07 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/mount/command.go
sdk/go/keepclient/block_cache.go [deleted file]
sdk/go/keepclient/collectionreader_test.go
sdk/go/keepclient/gateway_shim.go
sdk/go/keepclient/keepclient.go

index 0e0d3c43e40df34947b61fe63421b11481ee3abc..bde13424dd2db954a631a9fe3a70e236a2873b62 100644 (file)
@@ -78,7 +78,6 @@ type IKeepClient interface {
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
-       ClearBlockCache()
        SetStorageClasses(sc []string)
 }
 
@@ -2033,7 +2032,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                log.Printf("%s: %v", containerUUID, err)
                return 1
        }
-       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
        cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
index c53382135136b37b2cfae004fb6f318b13dbc2f5..276dd366617e71a17bd7acb907a30941459ce7a5 100644 (file)
@@ -368,9 +368,6 @@ func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
        return 0, errors.New("not implemented")
 }
 
-func (client *KeepTestClient) ClearBlockCache() {
-}
-
 func (client *KeepTestClient) Close() {
        client.Content = nil
 }
index f88d977c4c9bb059e6712fac1c727a71ed22dea7..666f2cf4acebfce59918e64894dda894e332438d 100644 (file)
@@ -43,7 +43,6 @@ func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, s
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        ro := flags.Bool("ro", false, "read-only")
        experimental := flags.Bool("experimental", false, "acknowledge this is an experimental command, and should not be used in production (required)")
-       blockCache := flags.Int("block-cache", 4, "read cache size (number of 64MiB blocks)")
        pprof := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
        if ok, code := cmd.ParseFlags(flags, prog, args, "[FUSE mount options]", stderr); !ok {
                return code
@@ -69,7 +68,6 @@ func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, s
                logger.Print(err)
                return 1
        }
-       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: *blockCache}
        host := fuse.NewFileSystemHost(&keepFS{
                Client:     client,
                KeepClient: kc,
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
deleted file mode 100644 (file)
index 37eee4d..0000000
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package keepclient
-
-import (
-       "bytes"
-       "context"
-       "io"
-       "sort"
-       "strconv"
-       "strings"
-       "sync"
-       "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-)
-
-var DefaultBlockCache = &BlockCache{}
-
-type BlockCache struct {
-       // Maximum number of blocks to keep in the cache. If 0, a
-       // default size (currently 4) is used instead.
-       MaxBlocks int
-
-       cache map[string]*cacheBlock
-       mtx   sync.Mutex
-}
-
-const defaultMaxBlocks = 4
-
-// Sweep deletes the least recently used blocks from the cache until
-// there are no more than MaxBlocks left.
-func (c *BlockCache) Sweep() {
-       max := c.MaxBlocks
-       if max == 0 {
-               max = defaultMaxBlocks
-       }
-       c.mtx.Lock()
-       defer c.mtx.Unlock()
-       if len(c.cache) <= max {
-               return
-       }
-       lru := make([]time.Time, 0, len(c.cache))
-       for _, b := range c.cache {
-               lru = append(lru, b.lastUse)
-       }
-       sort.Sort(sort.Reverse(timeSlice(lru)))
-       threshold := lru[max]
-       for loc, b := range c.cache {
-               if !b.lastUse.After(threshold) {
-                       delete(c.cache, loc)
-               }
-       }
-}
-
-// ReadAt returns data from the cache, first retrieving it from Keep if
-// necessary.
-func (c *BlockCache) ReadAt(upstream arvados.KeepGateway, locator string, p []byte, off int) (int, error) {
-       buf, err := c.get(upstream, locator)
-       if err != nil {
-               return 0, err
-       }
-       if off > len(buf) {
-               return 0, io.ErrUnexpectedEOF
-       }
-       return copy(p, buf[off:]), nil
-}
-
-// Get a block from the cache, first retrieving it from Keep if
-// necessary.
-func (c *BlockCache) get(upstream arvados.KeepGateway, locator string) ([]byte, error) {
-       cacheKey := locator[:32]
-       bufsize := BLOCKSIZE
-       if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
-               datasize, err := strconv.ParseInt(parts[1], 10, 32)
-               if err == nil && datasize >= 0 {
-                       bufsize = int(datasize)
-               }
-       }
-       c.mtx.Lock()
-       if c.cache == nil {
-               c.cache = make(map[string]*cacheBlock)
-       }
-       b, ok := c.cache[cacheKey]
-       if !ok || b.err != nil {
-               b = &cacheBlock{
-                       fetched: make(chan struct{}),
-                       lastUse: time.Now(),
-               }
-               c.cache[cacheKey] = b
-               go func() {
-                       buf := bytes.NewBuffer(make([]byte, 0, bufsize))
-                       _, err := upstream.BlockRead(context.Background(), arvados.BlockReadOptions{Locator: locator, WriteTo: buf})
-                       c.mtx.Lock()
-                       b.data, b.err = buf.Bytes(), err
-                       c.mtx.Unlock()
-                       close(b.fetched)
-                       go c.Sweep()
-               }()
-       }
-       c.mtx.Unlock()
-
-       // Wait (with mtx unlocked) for the fetch goroutine to finish,
-       // in case it hasn't already.
-       <-b.fetched
-
-       c.mtx.Lock()
-       b.lastUse = time.Now()
-       c.mtx.Unlock()
-       return b.data, b.err
-}
-
-func (c *BlockCache) Clear() {
-       c.mtx.Lock()
-       c.cache = nil
-       c.mtx.Unlock()
-}
-
-type timeSlice []time.Time
-
-func (ts timeSlice) Len() int { return len(ts) }
-
-func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
-
-func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
-
-type cacheBlock struct {
-       data    []byte
-       err     error
-       fetched chan struct{}
-       lastUse time.Time
-}
index 65dcd9ac8add0c24cf2c7e7460dcafb4530fe8ac..c1bad8557d70896572c9ef5d24615df5175e6ac6 100644 (file)
@@ -237,14 +237,8 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
-       // Disable disk cache
-       defer func(home string) {
-               os.Setenv("HOME", home)
-       }(os.Getenv("HOME"))
-       os.Setenv("HOME", "")
-
-       // Disable memory cache
-       s.kc.BlockCache = &BlockCache{}
+       // Disable cache
+       s.kc.gatewayStack = &keepViaHTTP{s.kc}
 
        s.kc.PutB([]byte("foo"))
        s.kc.PutB([]byte("bar"))
index 35c191afe6fda077f10c9901fae444e2aa5f08e8..eeb187e10735f0cc0d6f94757dece828df7301a4 100644 (file)
@@ -67,28 +67,3 @@ func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
        }
        return loc, nil
 }
-
-// keepViaBlockCache implements arvados.KeepGateway by using the given
-// KeepClient's BlockCache with the wrapped KeepGateway.
-//
-// Note the whole KeepClient gets passed in instead of just its
-// cache. This ensures the new BlockCache gets used if it changes
-// after keepViaBlockCache is initialized.
-type keepViaBlockCache struct {
-       kc *KeepClient
-       arvados.KeepGateway
-}
-
-func (kvbc *keepViaBlockCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-       return kvbc.kc.cache().ReadAt(kvbc.KeepGateway, locator, dst, offset)
-}
-
-func (kvbc *keepViaBlockCache) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
-       rdr, _, _, _, err := kvbc.kc.getOrHead("GET", opts.Locator, nil)
-       if err != nil {
-               return 0, err
-       }
-       defer rdr.Close()
-       n, err := io.Copy(opts.WriteTo, rdr)
-       return int(n), err
-}
index 2712e9f3a580ef610615a5f88cdde47a11ae8469..4e935812be85d951c578794379e8ebcb7d99aff0 100644 (file)
@@ -110,7 +110,6 @@ type KeepClient struct {
        lock                  sync.RWMutex
        HTTPClient            HTTPClient
        Retries               int
-       BlockCache            *BlockCache
        RequestID             string
        StorageClasses        []string
        DefaultStorageClasses []string // Set by cluster's exported config
@@ -542,17 +541,6 @@ func (kc *KeepClient) getSortedRoots(locator string) []string {
        return found
 }
 
-func (kc *KeepClient) cache() *BlockCache {
-       if kc.BlockCache != nil {
-               return kc.BlockCache
-       }
-       return DefaultBlockCache
-}
-
-func (kc *KeepClient) ClearBlockCache() {
-       kc.cache().Clear()
-}
-
 func (kc *KeepClient) SetStorageClasses(sc []string) {
        // make a copy so the caller can't mess with it.
        kc.StorageClasses = append([]string{}, sc...)