20318: Track estimated cache usage, and tidy more diligently.
authorTom Clegg <tom@curii.com>
Wed, 20 Dec 2023 18:33:56 +0000 (13:33 -0500)
committerTom Clegg <tom@curii.com>
Wed, 20 Dec 2023 18:33:56 +0000 (13:33 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/keep_cache.go
sdk/go/arvados/keep_cache_test.go

index b366d6f1b09b5605bf28b65011765acdcf9d69d8..af80daa2e07647346fd628eda6b7acdbc2338295 100644 (file)
@@ -58,6 +58,9 @@ type DiskCache struct {
        writing     map[string]*writeprogress
        writingCond *sync.Cond
        writingLock sync.Mutex
+
+       sizeMeasured  int64 // actual size on disk after last tidy(); zero if not measured yet
+       sizeEstimated int64 // last measured size, plus files we have written since
 }
 
 type writeprogress struct {
@@ -76,7 +79,7 @@ type openFileEnt struct {
 const (
        cacheFileSuffix  = ".keepcacheblock"
        tmpFileSuffix    = ".tmp"
-       tidyHoldDuration = 10 * time.Second
+       tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
 )
 
 func (cache *DiskCache) cacheFile(locator string) string {
@@ -123,7 +126,6 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
-       cache.gotidy()
        unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
        tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
        tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
@@ -187,6 +189,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
                if err != nil {
                        cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
                }
+               atomic.AddInt64(&cache.sizeEstimated, int64(n))
+               cache.gotidy()
        }()
 
        // Write through to the wrapped KeepGateway from the pipe,
@@ -231,7 +235,6 @@ func (fw funcwriter) Write(p []byte) (int, error) {
 // cache. The remainder of the block may continue to be copied into
 // the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-       cache.gotidy()
        cachefilename := cache.cacheFile(locator)
        if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
                return n, err
@@ -305,6 +308,8 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
                                        }
                                        return n, err
                                })})
+                       atomic.AddInt64(&cache.sizeEstimated, int64(size))
+                       cache.gotidy()
                }()
        }
        progress.cond.L.Lock()
@@ -519,9 +524,18 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
 // Start a tidy() goroutine, unless one is already running / recently
 // finished.
 func (cache *DiskCache) gotidy() {
-       // Return quickly if another tidy goroutine is running in this process.
+       // Skip if another tidy goroutine is running in this process.
        n := atomic.AddInt32(&cache.tidying, 1)
-       if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+       if n != 1 {
+               atomic.AddInt32(&cache.tidying, -1)
+               return
+       }
+       // Skip if sizeEstimated is based on an actual measurement and
+       // is below MaxSize, and we haven't reached the "recheck
+       // anyway" time threshold.
+       if cache.sizeMeasured > 0 &&
+               atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
+               time.Now().Before(cache.tidyHoldUntil) {
                atomic.AddInt32(&cache.tidying, -1)
                return
        }
@@ -608,11 +622,26 @@ func (cache *DiskCache) tidy() {
                return
        }
 
-       if totalsize <= maxsize {
+       // If we're below MaxSize or there's only one block in the
+       // cache, just update the usage estimate and return.
+       //
+       // (We never delete the last block because that would merely
+       // cause the same block to get re-fetched repeatedly from the
+       // backend.)
+       if totalsize <= maxsize || len(ents) == 1 {
+               atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+               atomic.StoreInt64(&cache.sizeEstimated, totalsize)
                return
        }
 
-       // Delete oldest entries until totalsize < maxsize.
+       // Set a new size target of maxsize minus 5%.  This makes some
+       // room for sizeEstimate to grow before it triggers another
+       // tidy. We don't want to walk/sort an entire large cache
+       // directory each time we write a block.
+       target := maxsize - (maxsize / 20)
+
+       // Delete oldest entries until totalsize < target or we're
+       // down to a single cached block.
        sort.Slice(ents, func(i, j int) bool {
                return ents[i].atime.Before(ents[j].atime)
        })
@@ -622,7 +651,7 @@ func (cache *DiskCache) tidy() {
                go cache.deleteHeldopen(ent.path, nil)
                deleted++
                totalsize -= ent.size
-               if totalsize <= maxsize {
+               if totalsize <= target || deleted == len(ents)-1 {
                        break
                }
        }
@@ -633,4 +662,6 @@ func (cache *DiskCache) tidy() {
                        "totalsize": totalsize,
                }).Debugf("DiskCache: remaining cache usage after deleting")
        }
+       atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+       atomic.StoreInt64(&cache.sizeEstimated, totalsize)
 }
index 5ae932a782b085b662c067de5e5a6caeae5719c4..5fe1a6a08ec5e0c563ae0c0280addbe96f3d19fa 100644 (file)
@@ -127,15 +127,29 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
                Data: make([]byte, 44000000),
        })
        c.Check(err, check.IsNil)
+
+       // Wait for tidy to finish, check that it doesn't delete the
+       // only block.
        time.Sleep(time.Millisecond)
+       for atomic.LoadInt32(&cache.tidying) > 0 {
+               time.Sleep(time.Millisecond)
+       }
+       c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
+
        resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
                Data: make([]byte, 32000000),
        })
        c.Check(err, check.IsNil)
        delete(backend.data, resp1.Locator)
        delete(backend.data, resp2.Locator)
-       cache.tidyHoldUntil = time.Time{}
-       cache.tidy()
+
+       // Wait for tidy to finish, check that it deleted the older
+       // block.
+       time.Sleep(time.Millisecond)
+       for atomic.LoadInt32(&cache.tidying) > 0 {
+               time.Sleep(time.Millisecond)
+       }
+       c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
 
        n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
        c.Check(n, check.Equals, 0)