From fd608866afd57f3f407d6103f770fad8b58eb564 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 26 Dec 2023 15:16:28 -0500 Subject: [PATCH] 20318: Use one tidying goroutine and filehandle pool per cache dir. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/keep_cache.go | 57 ++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go index a657153876..1d12f4cdc6 100644 --- a/sdk/go/arvados/keep_cache.go +++ b/sdk/go/arvados/keep_cache.go @@ -40,6 +40,26 @@ type DiskCache struct { MaxSize ByteSizeOrPercent Logger logrus.FieldLogger + *sharedCache + setupOnce sync.Once +} + +var ( + sharedCachesLock sync.Mutex + sharedCaches = map[string]*sharedCache{} +) + +// sharedCache has fields that coordinate the cache usage in a single +// cache directory; it can be shared by multiple DiskCaches. +// +// This serves to share a single pool of held-open filehandles, a +// single tidying goroutine, etc., even when the program (like +// keep-web) uses multiple KeepGateway stacks that use different auth +// tokens, etc. +type sharedCache struct { + dir string + maxSize ByteSizeOrPercent + tidying int32 // see tidy() tidyHoldUntil time.Time defaultMaxSize int64 @@ -82,12 +102,22 @@ const ( tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max ) +func (cache *DiskCache) setup() { + sharedCachesLock.Lock() + defer sharedCachesLock.Unlock() + dir := cache.Dir + if sharedCaches[dir] == nil { + sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize} + } + cache.sharedCache = sharedCaches[dir] +} + func (cache *DiskCache) cacheFile(locator string) string { hash := locator if i := strings.Index(hash, "+"); i > 0 { hash = hash[:i] } - return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix) + return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix) } // Open a cache file, creating the parent dir if necessary. @@ -126,8 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) { // BlockWrite writes through to the wrapped KeepGateway, and (if // possible) retains a copy of the written block in the cache. func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) { + cache.setupOnce.Do(cache.setup) unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix) - tmpfilename := filepath.Join(cache.Dir, "tmp", unique) + tmpfilename := filepath.Join(cache.dir, "tmp", unique) tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR) if err != nil { cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err) @@ -235,6 +266,7 @@ func (fw funcwriter) Write(p []byte) (int, error) { // cache. The remainder of the block may continue to be copied into // the cache in the background. func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) { + cache.setupOnce.Do(cache.setup) cachefilename := cache.cacheFile(locator) if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil { return n, err @@ -382,11 +414,11 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int lim := syscall.Rlimit{} err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim) if err != nil { - cache.heldopenMax = 256 - } else if lim.Cur > 40000 { + cache.heldopenMax = 100 + } else if lim.Cur > 400000 { cache.heldopenMax = 10000 } else { - cache.heldopenMax = int(lim.Cur / 4) + cache.heldopenMax = int(lim.Cur / 40) } } heldopen := cache.heldopen[cachefilename] @@ -483,6 +515,7 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int // BlockRead reads an entire block using a 128 KiB buffer. func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) { + cache.setupOnce.Do(cache.setup) i := strings.Index(opts.Locator, "+") if i < 0 || i >= len(opts.Locator) { return 0, errors.New("invalid block locator: no size hint") @@ -531,7 +564,7 @@ func (cache *DiskCache) gotidy() { return } // Skip if sizeEstimated is based on an actual measurement and - // is below MaxSize, and we haven't reached the "recheck + // is below maxSize, and we haven't reached the "recheck // anyway" time threshold. if cache.sizeMeasured > 0 && atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) && @@ -548,19 +581,19 @@ func (cache *DiskCache) gotidy() { // Delete cache files as needed to control disk usage. func (cache *DiskCache) tidy() { - maxsize := int64(cache.MaxSize.ByteSize()) + maxsize := int64(cache.maxSize.ByteSize()) if maxsize < 1 { maxsize = atomic.LoadInt64(&cache.defaultMaxSize) if maxsize == 0 { // defaultMaxSize not yet computed. Use 10% of // filesystem capacity (or different - // percentage if indicated by cache.MaxSize) - pct := cache.MaxSize.Percent() + // percentage if indicated by cache.maxSize) + pct := cache.maxSize.Percent() if pct == 0 { pct = 10 } var stat unix.Statfs_t - if nil == unix.Statfs(cache.Dir, &stat) { + if nil == unix.Statfs(cache.dir, &stat) { maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100 atomic.StoreInt64(&cache.defaultMaxSize, maxsize) } else { @@ -572,7 +605,7 @@ func (cache *DiskCache) tidy() { } // Bail if a tidy goroutine is running in a different process. - lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) + lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) if err != nil { return } @@ -589,7 +622,7 @@ func (cache *DiskCache) tidy() { } var ents []entT var totalsize int64 - filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error { + filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error { if err != nil { cache.debugf("tidy: skipping dir %s: %s", path, err) return nil -- 2.39.5