X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/461fdaa1b96142b8065c131ae0334046fc71ea56..279efb4dd345bcb1beee2c77ac14d66e57103b9f:/sdk/go/arvados/keep_cache.go diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go index af80daa2e0..b6b2a9da66 100644 --- a/sdk/go/arvados/keep_cache.go +++ b/sdk/go/arvados/keep_cache.go @@ -37,11 +37,30 @@ type KeepGateway interface { type DiskCache struct { KeepGateway Dir string - MaxSize int64 + MaxSize ByteSizeOrPercent Logger logrus.FieldLogger + *sharedCache + setupOnce sync.Once +} + +var ( + sharedCachesLock sync.Mutex + sharedCaches = map[string]*sharedCache{} +) + +// sharedCache has fields that coordinate the cache usage in a single +// cache directory; it can be shared by multiple DiskCaches. +// +// This serves to share a single pool of held-open filehandles, a +// single tidying goroutine, etc., even when the program (like +// keep-web) uses multiple KeepGateway stacks that use different auth +// tokens, etc. +type sharedCache struct { + dir string + maxSize ByteSizeOrPercent + tidying int32 // see tidy() - tidyHoldUntil time.Time defaultMaxSize int64 // The "heldopen" fields are used to open cache files for @@ -59,8 +78,10 @@ type DiskCache struct { writingCond *sync.Cond writingLock sync.Mutex - sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet - sizeEstimated int64 // last measured size, plus files we have written since + sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet + sizeEstimated int64 // last measured size, plus files we have written since + lastFileCount int64 // number of files on disk at last count + writesSinceTidy int64 // number of files written since last tidy() } type writeprogress struct { @@ -77,17 +98,26 @@ type openFileEnt struct { } const ( - cacheFileSuffix = ".keepcacheblock" - tmpFileSuffix = ".tmp" - tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max + cacheFileSuffix = ".keepcacheblock" + tmpFileSuffix = ".tmp" ) +func (cache *DiskCache) setup() { + sharedCachesLock.Lock() + defer sharedCachesLock.Unlock() + dir := cache.Dir + if sharedCaches[dir] == nil { + sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize} + } + cache.sharedCache = sharedCaches[dir] +} + func (cache *DiskCache) cacheFile(locator string) string { hash := locator if i := strings.Index(hash, "+"); i > 0 { hash = hash[:i] } - return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix) + return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix) } // Open a cache file, creating the parent dir if necessary. @@ -126,8 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) { // BlockWrite writes through to the wrapped KeepGateway, and (if // possible) retains a copy of the written block in the cache. func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) { + cache.setupOnce.Do(cache.setup) unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix) - tmpfilename := filepath.Join(cache.Dir, "tmp", unique) + tmpfilename := filepath.Join(cache.dir, "tmp", unique) tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR) if err != nil { cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err) @@ -235,6 +266,7 @@ func (fw funcwriter) Write(p []byte) (int, error) { // cache. The remainder of the block may continue to be copied into // the cache in the background. func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) { + cache.setupOnce.Do(cache.setup) cachefilename := cache.cacheFile(locator) if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil { return n, err @@ -382,11 +414,11 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int lim := syscall.Rlimit{} err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim) if err != nil { - cache.heldopenMax = 256 - } else if lim.Cur > 40000 { + cache.heldopenMax = 100 + } else if lim.Cur > 400000 { cache.heldopenMax = 10000 } else { - cache.heldopenMax = int(lim.Cur / 4) + cache.heldopenMax = int(lim.Cur / 40) } } heldopen := cache.heldopen[cachefilename] @@ -483,6 +515,7 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int // BlockRead reads an entire block using a 128 KiB buffer. func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) { + cache.setupOnce.Do(cache.setup) i := strings.Index(opts.Locator, "+") if i < 0 || i >= len(opts.Locator) { return 0, errors.New("invalid block locator: no size hint") @@ -524,6 +557,7 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i // Start a tidy() goroutine, unless one is already running / recently // finished. func (cache *DiskCache) gotidy() { + writes := atomic.AddInt64(&cache.writesSinceTidy, 1) // Skip if another tidy goroutine is running in this process. n := atomic.AddInt32(&cache.tidying, 1) if n != 1 { @@ -531,36 +565,49 @@ func (cache *DiskCache) gotidy() { return } // Skip if sizeEstimated is based on an actual measurement and - // is below MaxSize, and we haven't reached the "recheck - // anyway" time threshold. + // is below maxSize, and we haven't done very many writes + // since last tidy (defined as 1% of number of cache files at + // last count). if cache.sizeMeasured > 0 && - atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize && - time.Now().Before(cache.tidyHoldUntil) { + atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) && + writes < cache.lastFileCount/100 { atomic.AddInt32(&cache.tidying, -1) return } go func() { cache.tidy() - cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration) + atomic.StoreInt64(&cache.writesSinceTidy, 0) atomic.AddInt32(&cache.tidying, -1) }() } // Delete cache files as needed to control disk usage. func (cache *DiskCache) tidy() { - maxsize := cache.MaxSize + maxsize := int64(cache.maxSize.ByteSize()) if maxsize < 1 { - if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 { + maxsize = atomic.LoadInt64(&cache.defaultMaxSize) + if maxsize == 0 { + // defaultMaxSize not yet computed. Use 10% of + // filesystem capacity (or different + // percentage if indicated by cache.maxSize) + pct := cache.maxSize.Percent() + if pct == 0 { + pct = 10 + } var stat unix.Statfs_t - if nil == unix.Statfs(cache.Dir, &stat) { - maxsize = int64(stat.Bavail) * stat.Bsize / 10 + if nil == unix.Statfs(cache.dir, &stat) { + maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100 + atomic.StoreInt64(&cache.defaultMaxSize, maxsize) + } else { + // In this case we will set + // defaultMaxSize below after + // measuring current usage. } - atomic.StoreInt64(&cache.defaultMaxSize, maxsize) } } // Bail if a tidy goroutine is running in a different process. - lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) + lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) if err != nil { return } @@ -577,7 +624,7 @@ func (cache *DiskCache) tidy() { } var ents []entT var totalsize int64 - filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error { + filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error { if err != nil { cache.debugf("tidy: skipping dir %s: %s", path, err) return nil @@ -611,7 +658,8 @@ func (cache *DiskCache) tidy() { // If MaxSize wasn't specified and we failed to come up with a // defaultSize above, use the larger of {current cache size, 1 - // GiB} as the defaultSize for subsequent tidy() operations. + // GiB} as the defaultMaxSize for subsequent tidy() + // operations. if maxsize == 0 { if totalsize < 1<<30 { atomic.StoreInt64(&cache.defaultMaxSize, 1<<30) @@ -631,6 +679,7 @@ func (cache *DiskCache) tidy() { if totalsize <= maxsize || len(ents) == 1 { atomic.StoreInt64(&cache.sizeMeasured, totalsize) atomic.StoreInt64(&cache.sizeEstimated, totalsize) + cache.lastFileCount = int64(len(ents)) return } @@ -664,4 +713,5 @@ func (cache *DiskCache) tidy() { } atomic.StoreInt64(&cache.sizeMeasured, totalsize) atomic.StoreInt64(&cache.sizeEstimated, totalsize) + cache.lastFileCount = int64(len(ents) - deleted) }