From a32cbc86cef9c05cc63a4bd749553c13befff730 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 20 Dec 2023 13:33:56 -0500 Subject: [PATCH] 20318: Track estimated cache usage, and tidy more diligently. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/keep_cache.go | 47 +++++++++++++++++++++++++------ sdk/go/arvados/keep_cache_test.go | 18 ++++++++++-- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go index b366d6f1b0..af80daa2e0 100644 --- a/sdk/go/arvados/keep_cache.go +++ b/sdk/go/arvados/keep_cache.go @@ -58,6 +58,9 @@ type DiskCache struct { writing map[string]*writeprogress writingCond *sync.Cond writingLock sync.Mutex + + sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet + sizeEstimated int64 // last measured size, plus files we have written since } type writeprogress struct { @@ -76,7 +79,7 @@ type openFileEnt struct { const ( cacheFileSuffix = ".keepcacheblock" tmpFileSuffix = ".tmp" - tidyHoldDuration = 10 * time.Second + tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max ) func (cache *DiskCache) cacheFile(locator string) string { @@ -123,7 +126,6 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) { // BlockWrite writes through to the wrapped KeepGateway, and (if // possible) retains a copy of the written block in the cache. func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) { - cache.gotidy() unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix) tmpfilename := filepath.Join(cache.Dir, "tmp", unique) tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR) @@ -187,6 +189,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) if err != nil { cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err) } + atomic.AddInt64(&cache.sizeEstimated, int64(n)) + cache.gotidy() }() // Write through to the wrapped KeepGateway from the pipe, @@ -231,7 +235,6 @@ func (fw funcwriter) Write(p []byte) (int, error) { // cache. The remainder of the block may continue to be copied into // the cache in the background. func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) { - cache.gotidy() cachefilename := cache.cacheFile(locator) if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil { return n, err @@ -305,6 +308,8 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err } return n, err })}) + atomic.AddInt64(&cache.sizeEstimated, int64(size)) + cache.gotidy() }() } progress.cond.L.Lock() @@ -519,9 +524,18 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i // Start a tidy() goroutine, unless one is already running / recently // finished. func (cache *DiskCache) gotidy() { - // Return quickly if another tidy goroutine is running in this process. + // Skip if another tidy goroutine is running in this process. n := atomic.AddInt32(&cache.tidying, 1) - if n != 1 || time.Now().Before(cache.tidyHoldUntil) { + if n != 1 { + atomic.AddInt32(&cache.tidying, -1) + return + } + // Skip if sizeEstimated is based on an actual measurement and + // is below MaxSize, and we haven't reached the "recheck + // anyway" time threshold. + if cache.sizeMeasured > 0 && + atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize && + time.Now().Before(cache.tidyHoldUntil) { atomic.AddInt32(&cache.tidying, -1) return } @@ -608,11 +622,26 @@ func (cache *DiskCache) tidy() { return } - if totalsize <= maxsize { + // If we're below MaxSize or there's only one block in the + // cache, just update the usage estimate and return. + // + // (We never delete the last block because that would merely + // cause the same block to get re-fetched repeatedly from the + // backend.) + if totalsize <= maxsize || len(ents) == 1 { + atomic.StoreInt64(&cache.sizeMeasured, totalsize) + atomic.StoreInt64(&cache.sizeEstimated, totalsize) return } - // Delete oldest entries until totalsize < maxsize. + // Set a new size target of maxsize minus 5%. This makes some + // room for sizeEstimate to grow before it triggers another + // tidy. We don't want to walk/sort an entire large cache + // directory each time we write a block. + target := maxsize - (maxsize / 20) + + // Delete oldest entries until totalsize < target or we're + // down to a single cached block. sort.Slice(ents, func(i, j int) bool { return ents[i].atime.Before(ents[j].atime) }) @@ -622,7 +651,7 @@ func (cache *DiskCache) tidy() { go cache.deleteHeldopen(ent.path, nil) deleted++ totalsize -= ent.size - if totalsize <= maxsize { + if totalsize <= target || deleted == len(ents)-1 { break } } @@ -633,4 +662,6 @@ func (cache *DiskCache) tidy() { "totalsize": totalsize, }).Debugf("DiskCache: remaining cache usage after deleting") } + atomic.StoreInt64(&cache.sizeMeasured, totalsize) + atomic.StoreInt64(&cache.sizeEstimated, totalsize) } diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go index 5ae932a782..5fe1a6a08e 100644 --- a/sdk/go/arvados/keep_cache_test.go +++ b/sdk/go/arvados/keep_cache_test.go @@ -127,15 +127,29 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) { Data: make([]byte, 44000000), }) c.Check(err, check.IsNil) + + // Wait for tidy to finish, check that it doesn't delete the + // only block. time.Sleep(time.Millisecond) + for atomic.LoadInt32(&cache.tidying) > 0 { + time.Sleep(time.Millisecond) + } + c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000)) + resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{ Data: make([]byte, 32000000), }) c.Check(err, check.IsNil) delete(backend.data, resp1.Locator) delete(backend.data, resp2.Locator) - cache.tidyHoldUntil = time.Time{} - cache.tidy() + + // Wait for tidy to finish, check that it deleted the older + // block. + time.Sleep(time.Millisecond) + for atomic.LoadInt32(&cache.tidying) > 0 { + time.Sleep(time.Millisecond) + } + c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000)) n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0) c.Check(n, check.Equals, 0) -- 2.30.2