20318: Sync cache state after 1% churn instead of 5 minute timer.
[arvados.git] / sdk / go / arvados / keep_cache.go
index af80daa2e07647346fd628eda6b7acdbc2338295..b6b2a9da669f49ed292ddbdf4b412958333857c4 100644 (file)
@@ -37,11 +37,30 @@ type KeepGateway interface {
 type DiskCache struct {
        KeepGateway
        Dir     string
-       MaxSize int64
+       MaxSize ByteSizeOrPercent
        Logger  logrus.FieldLogger
 
+       *sharedCache
+       setupOnce sync.Once
+}
+
+var (
+       sharedCachesLock sync.Mutex
+       sharedCaches     = map[string]*sharedCache{}
+)
+
+// sharedCache has fields that coordinate the cache usage in a single
+// cache directory; it can be shared by multiple DiskCaches.
+//
+// This serves to share a single pool of held-open filehandles, a
+// single tidying goroutine, etc., even when the program (like
+// keep-web) uses multiple KeepGateway stacks that use different auth
+// tokens, etc.
+type sharedCache struct {
+       dir     string
+       maxSize ByteSizeOrPercent
+
        tidying        int32 // see tidy()
-       tidyHoldUntil  time.Time
        defaultMaxSize int64
 
        // The "heldopen" fields are used to open cache files for
@@ -59,8 +78,10 @@ type DiskCache struct {
        writingCond *sync.Cond
        writingLock sync.Mutex
 
-       sizeMeasured  int64 // actual size on disk after last tidy(); zero if not measured yet
-       sizeEstimated int64 // last measured size, plus files we have written since
+       sizeMeasured    int64 // actual size on disk after last tidy(); zero if not measured yet
+       sizeEstimated   int64 // last measured size, plus files we have written since
+       lastFileCount   int64 // number of files on disk at last count
+       writesSinceTidy int64 // number of files written since last tidy()
 }
 
 type writeprogress struct {
@@ -77,17 +98,26 @@ type openFileEnt struct {
 }
 
 const (
-       cacheFileSuffix  = ".keepcacheblock"
-       tmpFileSuffix    = ".tmp"
-       tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
+       cacheFileSuffix = ".keepcacheblock"
+       tmpFileSuffix   = ".tmp"
 )
 
+func (cache *DiskCache) setup() {
+       sharedCachesLock.Lock()
+       defer sharedCachesLock.Unlock()
+       dir := cache.Dir
+       if sharedCaches[dir] == nil {
+               sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
+       }
+       cache.sharedCache = sharedCaches[dir]
+}
+
 func (cache *DiskCache) cacheFile(locator string) string {
        hash := locator
        if i := strings.Index(hash, "+"); i > 0 {
                hash = hash[:i]
        }
-       return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+       return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
 }
 
 // Open a cache file, creating the parent dir if necessary.
@@ -126,8 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+       cache.setupOnce.Do(cache.setup)
        unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
-       tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+       tmpfilename := filepath.Join(cache.dir, "tmp", unique)
        tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
        if err != nil {
                cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
@@ -235,6 +266,7 @@ func (fw funcwriter) Write(p []byte) (int, error) {
 // cache. The remainder of the block may continue to be copied into
 // the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+       cache.setupOnce.Do(cache.setup)
        cachefilename := cache.cacheFile(locator)
        if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
                return n, err
@@ -382,11 +414,11 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
                lim := syscall.Rlimit{}
                err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
                if err != nil {
-                       cache.heldopenMax = 256
-               } else if lim.Cur > 40000 {
+                       cache.heldopenMax = 100
+               } else if lim.Cur > 400000 {
                        cache.heldopenMax = 10000
                } else {
-                       cache.heldopenMax = int(lim.Cur / 4)
+                       cache.heldopenMax = int(lim.Cur / 40)
                }
        }
        heldopen := cache.heldopen[cachefilename]
@@ -483,6 +515,7 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
 
 // BlockRead reads an entire block using a 128 KiB buffer.
 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+       cache.setupOnce.Do(cache.setup)
        i := strings.Index(opts.Locator, "+")
        if i < 0 || i >= len(opts.Locator) {
                return 0, errors.New("invalid block locator: no size hint")
@@ -524,6 +557,7 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
 // Start a tidy() goroutine, unless one is already running / recently
 // finished.
 func (cache *DiskCache) gotidy() {
+       writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
        // Skip if another tidy goroutine is running in this process.
        n := atomic.AddInt32(&cache.tidying, 1)
        if n != 1 {
@@ -531,36 +565,49 @@ func (cache *DiskCache) gotidy() {
                return
        }
        // Skip if sizeEstimated is based on an actual measurement and
-       // is below MaxSize, and we haven't reached the "recheck
-       // anyway" time threshold.
+       // is below maxSize, and we haven't done very many writes
+       // since last tidy (defined as 1% of number of cache files at
+       // last count).
        if cache.sizeMeasured > 0 &&
-               atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
-               time.Now().Before(cache.tidyHoldUntil) {
+               atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
+               writes < cache.lastFileCount/100 {
                atomic.AddInt32(&cache.tidying, -1)
                return
        }
        go func() {
                cache.tidy()
-               cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
+               atomic.StoreInt64(&cache.writesSinceTidy, 0)
                atomic.AddInt32(&cache.tidying, -1)
        }()
 }
 
 // Delete cache files as needed to control disk usage.
 func (cache *DiskCache) tidy() {
-       maxsize := cache.MaxSize
+       maxsize := int64(cache.maxSize.ByteSize())
        if maxsize < 1 {
-               if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+               maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
+               if maxsize == 0 {
+                       // defaultMaxSize not yet computed. Use 10% of
+                       // filesystem capacity (or different
+                       // percentage if indicated by cache.maxSize)
+                       pct := cache.maxSize.Percent()
+                       if pct == 0 {
+                               pct = 10
+                       }
                        var stat unix.Statfs_t
-                       if nil == unix.Statfs(cache.Dir, &stat) {
-                               maxsize = int64(stat.Bavail) * stat.Bsize / 10
+                       if nil == unix.Statfs(cache.dir, &stat) {
+                               maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
+                               atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+                       } else {
+                               // In this case we will set
+                               // defaultMaxSize below after
+                               // measuring current usage.
                        }
-                       atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
                }
        }
 
        // Bail if a tidy goroutine is running in a different process.
-       lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+       lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
        if err != nil {
                return
        }
@@ -577,7 +624,7 @@ func (cache *DiskCache) tidy() {
        }
        var ents []entT
        var totalsize int64
-       filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+       filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
                if err != nil {
                        cache.debugf("tidy: skipping dir %s: %s", path, err)
                        return nil
@@ -611,7 +658,8 @@ func (cache *DiskCache) tidy() {
 
        // If MaxSize wasn't specified and we failed to come up with a
        // defaultSize above, use the larger of {current cache size, 1
-       // GiB} as the defaultSize for subsequent tidy() operations.
+       // GiB} as the defaultMaxSize for subsequent tidy()
+       // operations.
        if maxsize == 0 {
                if totalsize < 1<<30 {
                        atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
@@ -631,6 +679,7 @@ func (cache *DiskCache) tidy() {
        if totalsize <= maxsize || len(ents) == 1 {
                atomic.StoreInt64(&cache.sizeMeasured, totalsize)
                atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+               cache.lastFileCount = int64(len(ents))
                return
        }
 
@@ -664,4 +713,5 @@ func (cache *DiskCache) tidy() {
        }
        atomic.StoreInt64(&cache.sizeMeasured, totalsize)
        atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+       cache.lastFileCount = int64(len(ents) - deleted)
 }