20318: Sync cache state after 1% churn instead of 5 minute timer.
[arvados.git] / sdk / go / arvados / keep_cache.go
index b9c7fea4b0995551308ca160496cbcfaa49ced86..b6b2a9da669f49ed292ddbdf4b412958333857c4 100644 (file)
@@ -17,6 +17,7 @@ import (
        "sort"
        "strconv"
        "strings"
+       "sync"
        "sync/atomic"
        "syscall"
        "time"
@@ -36,25 +37,87 @@ type KeepGateway interface {
 type DiskCache struct {
        KeepGateway
        Dir     string
-       MaxSize int64
+       MaxSize ByteSizeOrPercent
        Logger  logrus.FieldLogger
 
+       *sharedCache
+       setupOnce sync.Once
+}
+
+var (
+       sharedCachesLock sync.Mutex
+       sharedCaches     = map[string]*sharedCache{}
+)
+
+// sharedCache has fields that coordinate the cache usage in a single
+// cache directory; it can be shared by multiple DiskCaches.
+//
+// This serves to share a single pool of held-open filehandles, a
+// single tidying goroutine, etc., even when the program (like
+// keep-web) uses multiple KeepGateway stacks that use different auth
+// tokens, etc.
+type sharedCache struct {
+       dir     string
+       maxSize ByteSizeOrPercent
+
        tidying        int32 // see tidy()
-       tidyHoldUntil  time.Time
        defaultMaxSize int64
+
+       // The "heldopen" fields are used to open cache files for
+       // reading, and leave them open for future/concurrent ReadAt
+       // operations. See quickReadAt.
+       heldopen     map[string]*openFileEnt
+       heldopenMax  int
+       heldopenLock sync.Mutex
+
+       // The "writing" fields allow multiple concurrent/sequential
+       // ReadAt calls to be notified as a single
+       // read-block-from-backend-into-cache goroutine fills the
+       // cache file.
+       writing     map[string]*writeprogress
+       writingCond *sync.Cond
+       writingLock sync.Mutex
+
+       sizeMeasured    int64 // actual size on disk after last tidy(); zero if not measured yet
+       sizeEstimated   int64 // last measured size, plus files we have written since
+       lastFileCount   int64 // number of files on disk at last count
+       writesSinceTidy int64 // number of files written since last tidy()
 }
 
-var (
+type writeprogress struct {
+       cond *sync.Cond // broadcast whenever size or done changes
+       done bool       // size and err have their final values
+       size int        // bytes copied into cache file so far
+       err  error      // error encountered while copying from backend to cache
+}
+
+type openFileEnt struct {
+       sync.RWMutex
+       f   *os.File
+       err error // if err is non-nil, f should not be used.
+}
+
+const (
        cacheFileSuffix = ".keepcacheblock"
        tmpFileSuffix   = ".tmp"
 )
 
+func (cache *DiskCache) setup() {
+       sharedCachesLock.Lock()
+       defer sharedCachesLock.Unlock()
+       dir := cache.Dir
+       if sharedCaches[dir] == nil {
+               sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
+       }
+       cache.sharedCache = sharedCaches[dir]
+}
+
 func (cache *DiskCache) cacheFile(locator string) string {
        hash := locator
        if i := strings.Index(hash, "+"); i > 0 {
                hash = hash[:i]
        }
-       return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+       return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
 }
 
 // Open a cache file, creating the parent dir if necessary.
@@ -93,9 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
-       cache.gotidy()
+       cache.setupOnce.Do(cache.setup)
        unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
-       tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+       tmpfilename := filepath.Join(cache.dir, "tmp", unique)
        tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
        if err != nil {
                cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
@@ -157,6 +220,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
                if err != nil {
                        cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
                }
+               atomic.AddInt64(&cache.sizeEstimated, int64(n))
+               cache.gotidy()
        }()
 
        // Write through to the wrapped KeepGateway from the pipe,
@@ -187,71 +252,270 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
        return resp, err
 }
 
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+       return fw(p)
+}
+
 // ReadAt reads the entire block from the wrapped KeepGateway into the
 // cache if needed, and copies the requested portion into the provided
 // slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-       cache.gotidy()
+       cache.setupOnce.Do(cache.setup)
        cachefilename := cache.cacheFile(locator)
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+       if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
+               return n, err
+       }
+       readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
        if err != nil {
-               cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
-               return cache.KeepGateway.ReadAt(locator, dst, offset)
+               return 0, fmt.Errorf("ReadAt: %w", err)
        }
-       defer f.Close()
+       defer readf.Close()
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+       err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
        if err != nil {
                return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
        }
 
-       size, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file seems to be truncated or empty
-               // (possibly because we just created it). Wait for an
-               // exclusive lock, then check again (in case someone
-               // else is doing the same thing) before trying to
-               // retrieve the entire block.
-               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-               if err != nil {
-                       return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       if progress != nil {
+               cache.writingLock.Unlock()
+       } else {
+               progress = &writeprogress{}
+               progress.cond = sync.NewCond(&sync.Mutex{})
+               if cache.writing == nil {
+                       cache.writing = map[string]*writeprogress{}
                }
+               cache.writing[cachefilename] = progress
+               cache.writingLock.Unlock()
+
+               // Start a goroutine to copy from backend to f. As
+               // data arrives, wake up any waiting loops (see below)
+               // so ReadAt() requests for partial data can return as
+               // soon as the relevant bytes have been copied.
+               go func() {
+                       var size int
+                       var writef *os.File
+                       var err error
+                       defer func() {
+                               closeErr := writef.Close()
+                               if err == nil {
+                                       err = closeErr
+                               }
+                               progress.cond.L.Lock()
+                               progress.err = err
+                               progress.done = true
+                               progress.size = size
+                               progress.cond.L.Unlock()
+                               progress.cond.Broadcast()
+                               cache.writingLock.Lock()
+                               delete(cache.writing, cachefilename)
+                               cache.writingLock.Unlock()
+                       }()
+                       writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+                       if err != nil {
+                               err = fmt.Errorf("ReadAt: %w", err)
+                               return
+                       }
+                       err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+                       if err != nil {
+                               err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+                               return
+                       }
+                       size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+                               Locator: locator,
+                               WriteTo: funcwriter(func(p []byte) (int, error) {
+                                       n, err := writef.Write(p)
+                                       if n > 0 {
+                                               progress.cond.L.Lock()
+                                               progress.size += n
+                                               progress.cond.L.Unlock()
+                                               progress.cond.Broadcast()
+                                       }
+                                       return n, err
+                               })})
+                       atomic.AddInt64(&cache.sizeEstimated, int64(size))
+                       cache.gotidy()
+               }()
+       }
+       progress.cond.L.Lock()
+       for !progress.done && progress.size < len(dst)+offset {
+               progress.cond.Wait()
+       }
+       ok := progress.size >= len(dst)+offset
+       err = progress.err
+       progress.cond.L.Unlock()
+
+       if !ok && err != nil {
+               // If the copy-from-backend goroutine encountered an
+               // error before copying enough bytes to satisfy our
+               // request, we return that error.
+               return 0, err
+       } else {
+               // Regardless of whether the copy-from-backend
+               // goroutine succeeded, or failed after copying the
+               // bytes we need, the only errors we need to report
+               // are errors reading from the cache file.
+               return readf.ReadAt(dst, int64(offset))
        }
-       size, err = f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
+}
+
+var quickReadAtLostRace = errors.New("quickReadAt: lost race")
+
+// Remove the cache entry for the indicated cachefilename if it
+// matches expect (quickReadAt() usage), or if expect is nil (tidy()
+// usage).
+//
+// If expect is non-nil, close expect's filehandle.
+//
+// If expect is nil and a different cache entry is deleted, close its
+// filehandle.
+func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
+       needclose := expect
+
+       cache.heldopenLock.Lock()
+       found := cache.heldopen[cachefilename]
+       if found != nil && (expect == nil || expect == found) {
+               delete(cache.heldopen, cachefilename)
+               needclose = found
+       }
+       cache.heldopenLock.Unlock()
+
+       if needclose != nil {
+               needclose.Lock()
+               defer needclose.Unlock()
+               if needclose.f != nil {
+                       needclose.f.Close()
+                       needclose.f = nil
+               }
        }
-       if size < int64(len(dst)+offset) {
-               // The cache file is truncated or empty, and we own it
-               // now. Fill it.
-               _, err = f.Seek(0, io.SeekStart)
+}
+
+// quickReadAt attempts to use a cached-filehandle approach to read
+// from the indicated file. The expectation is that the caller
+// (ReadAt) will try a more robust approach when this fails, so
+// quickReadAt doesn't try especially hard to ensure success in
+// races. In particular, when there are concurrent calls, and one
+// fails, that can cause others to fail too.
+func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
+       isnew := false
+       cache.heldopenLock.Lock()
+       if cache.heldopenMax == 0 {
+               // Choose a reasonable limit on open cache files based
+               // on RLIMIT_NOFILE. Note Go automatically raises
+               // softlimit to hardlimit, so it's typically 1048576,
+               // not 1024.
+               lim := syscall.Rlimit{}
+               err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
                if err != nil {
-                       return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+                       cache.heldopenMax = 100
+               } else if lim.Cur > 400000 {
+                       cache.heldopenMax = 10000
+               } else {
+                       cache.heldopenMax = int(lim.Cur / 40)
+               }
+       }
+       heldopen := cache.heldopen[cachefilename]
+       if heldopen == nil {
+               isnew = true
+               heldopen = &openFileEnt{}
+               if cache.heldopen == nil {
+                       cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
+               } else if len(cache.heldopen) > cache.heldopenMax {
+                       // Rather than go to the trouble of tracking
+                       // last access time, just close all files, and
+                       // open again as needed. Even in the worst
+                       // pathological case, this causes one extra
+                       // open+close per read, which is not
+                       // especially bad (see benchmarks).
+                       go func(m map[string]*openFileEnt) {
+                               for _, heldopen := range m {
+                                       heldopen.Lock()
+                                       defer heldopen.Unlock()
+                                       if heldopen.f != nil {
+                                               heldopen.f.Close()
+                                               heldopen.f = nil
+                                       }
+                               }
+                       }(cache.heldopen)
+                       cache.heldopen = nil
+               }
+               cache.heldopen[cachefilename] = heldopen
+               heldopen.Lock()
+       }
+       cache.heldopenLock.Unlock()
+
+       if isnew {
+               // Open and flock the file, then call wg.Done() to
+               // unblock any other goroutines that are waiting in
+               // the !isnew case above.
+               f, err := os.Open(cachefilename)
+               if err == nil {
+                       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+                       if err == nil {
+                               heldopen.f = f
+                       } else {
+                               f.Close()
+                       }
                }
-               n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
                if err != nil {
-                       return 0, err
+                       heldopen.err = err
+                       go cache.deleteHeldopen(cachefilename, heldopen)
+               }
+               heldopen.Unlock()
+       }
+       // Acquire read lock to ensure (1) initialization is complete,
+       // if it's done by a different goroutine, and (2) any "delete
+       // old/unused entries" waits for our read to finish before
+       // closing the file.
+       heldopen.RLock()
+       defer heldopen.RUnlock()
+       if heldopen.err != nil {
+               // Other goroutine encountered an error during setup
+               return 0, heldopen.err
+       } else if heldopen.f == nil {
+               // Other goroutine closed the file before we got RLock
+               return 0, quickReadAtLostRace
+       }
+
+       // If another goroutine is currently writing the file, wait
+       // for it to catch up to the end of the range we need.
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       cache.writingLock.Unlock()
+       if progress != nil {
+               progress.cond.L.Lock()
+               for !progress.done && progress.size < len(dst)+offset {
+                       progress.cond.Wait()
                }
-               f.Truncate(int64(n))
+               progress.cond.L.Unlock()
+               // If size<needed && progress.err!=nil here, we'll end
+               // up reporting a less helpful "EOF reading from cache
+               // file" below, instead of the actual error fetching
+               // from upstream to cache file.  This is OK though,
+               // because our caller (ReadAt) doesn't even report our
+               // error, it just retries.
        }
-       return f.ReadAt(dst, int64(offset))
-}
 
-// ReadAt reads the entire block from the wrapped KeepGateway into the
-// cache if needed, and writes it to the provided writer.
-func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
-       cache.gotidy()
-       cachefilename := cache.cacheFile(opts.Locator)
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+       n, err := heldopen.f.ReadAt(dst, int64(offset))
        if err != nil {
-               cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
-               return cache.KeepGateway.BlockRead(ctx, opts)
+               // wait for any concurrent users to finish, then
+               // delete this cache entry in case reopening the
+               // backing file helps.
+               go cache.deleteHeldopen(cachefilename, heldopen)
        }
-       defer f.Close()
+       return n, err
+}
 
+// BlockRead reads an entire block using a 128 KiB buffer.
+func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+       cache.setupOnce.Do(cache.setup)
        i := strings.Index(opts.Locator, "+")
        if i < 0 || i >= len(opts.Locator) {
                return 0, errors.New("invalid block locator: no size hint")
@@ -266,66 +530,84 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
                return 0, errors.New("invalid block locator: invalid size hint")
        }
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
-       if err != nil {
-               return 0, err
-       }
-       filesize, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, err
-       }
-       _, err = f.Seek(0, io.SeekStart)
-       if err != nil {
-               return 0, err
-       }
-       if filesize == blocksize {
-               n, err := io.Copy(opts.WriteTo, f)
-               return int(n), err
-       }
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-       if err != nil {
-               return 0, err
-       }
-       opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
-       n, err := cache.KeepGateway.BlockRead(ctx, opts)
-       if err != nil {
-               return int(n), err
+       offset := 0
+       buf := make([]byte, 131072)
+       for offset < int(blocksize) {
+               if ctx.Err() != nil {
+                       return offset, ctx.Err()
+               }
+               if int(blocksize)-offset > len(buf) {
+                       buf = buf[:int(blocksize)-offset]
+               }
+               nr, err := cache.ReadAt(opts.Locator, buf, offset)
+               if nr > 0 {
+                       nw, err := opts.WriteTo.Write(buf)
+                       if err != nil {
+                               return offset + nw, err
+                       }
+               }
+               offset += nr
+               if err != nil {
+                       return offset, err
+               }
        }
-       f.Truncate(int64(n))
-       return n, nil
+       return offset, nil
 }
 
 // Start a tidy() goroutine, unless one is already running / recently
 // finished.
 func (cache *DiskCache) gotidy() {
-       // Return quickly if another tidy goroutine is running in this process.
+       writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
+       // Skip if another tidy goroutine is running in this process.
        n := atomic.AddInt32(&cache.tidying, 1)
-       if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+       if n != 1 {
+               atomic.AddInt32(&cache.tidying, -1)
+               return
+       }
+       // Skip if sizeEstimated is based on an actual measurement and
+       // is below maxSize, and we haven't done very many writes
+       // since last tidy (defined as 1% of number of cache files at
+       // last count).
+       if cache.sizeMeasured > 0 &&
+               atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
+               writes < cache.lastFileCount/100 {
                atomic.AddInt32(&cache.tidying, -1)
                return
        }
        go func() {
                cache.tidy()
-               cache.tidyHoldUntil = time.Now().Add(10 * time.Second)
+               atomic.StoreInt64(&cache.writesSinceTidy, 0)
                atomic.AddInt32(&cache.tidying, -1)
        }()
 }
 
 // Delete cache files as needed to control disk usage.
 func (cache *DiskCache) tidy() {
-       maxsize := cache.MaxSize
+       maxsize := int64(cache.maxSize.ByteSize())
        if maxsize < 1 {
-               if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+               maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
+               if maxsize == 0 {
+                       // defaultMaxSize not yet computed. Use 10% of
+                       // filesystem capacity (or different
+                       // percentage if indicated by cache.maxSize)
+                       pct := cache.maxSize.Percent()
+                       if pct == 0 {
+                               pct = 10
+                       }
                        var stat unix.Statfs_t
-                       if nil == unix.Statfs(cache.Dir, &stat) {
-                               maxsize = int64(stat.Bavail) * stat.Bsize / 10
+                       if nil == unix.Statfs(cache.dir, &stat) {
+                               maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
+                               atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+                       } else {
+                               // In this case we will set
+                               // defaultMaxSize below after
+                               // measuring current usage.
                        }
-                       atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
                }
        }
 
        // Bail if a tidy goroutine is running in a different process.
-       lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+       lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
        if err != nil {
                return
        }
@@ -342,7 +624,7 @@ func (cache *DiskCache) tidy() {
        }
        var ents []entT
        var totalsize int64
-       filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+       filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
                if err != nil {
                        cache.debugf("tidy: skipping dir %s: %s", path, err)
                        return nil
@@ -376,7 +658,8 @@ func (cache *DiskCache) tidy() {
 
        // If MaxSize wasn't specified and we failed to come up with a
        // defaultSize above, use the larger of {current cache size, 1
-       // GiB} as the defaultSize for subsequent tidy() operations.
+       // GiB} as the defaultMaxSize for subsequent tidy()
+       // operations.
        if maxsize == 0 {
                if totalsize < 1<<30 {
                        atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
@@ -387,20 +670,37 @@ func (cache *DiskCache) tidy() {
                return
        }
 
-       if totalsize <= maxsize {
+       // If we're below MaxSize or there's only one block in the
+       // cache, just update the usage estimate and return.
+       //
+       // (We never delete the last block because that would merely
+       // cause the same block to get re-fetched repeatedly from the
+       // backend.)
+       if totalsize <= maxsize || len(ents) == 1 {
+               atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+               atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+               cache.lastFileCount = int64(len(ents))
                return
        }
 
-       // Delete oldest entries until totalsize < maxsize.
+       // Set a new size target of maxsize minus 5%.  This makes some
+       // room for sizeEstimate to grow before it triggers another
+       // tidy. We don't want to walk/sort an entire large cache
+       // directory each time we write a block.
+       target := maxsize - (maxsize / 20)
+
+       // Delete oldest entries until totalsize < target or we're
+       // down to a single cached block.
        sort.Slice(ents, func(i, j int) bool {
                return ents[i].atime.Before(ents[j].atime)
        })
        deleted := 0
        for _, ent := range ents {
                os.Remove(ent.path)
+               go cache.deleteHeldopen(ent.path, nil)
                deleted++
                totalsize -= ent.size
-               if totalsize <= maxsize {
+               if totalsize <= target || deleted == len(ents)-1 {
                        break
                }
        }
@@ -411,4 +711,7 @@ func (cache *DiskCache) tidy() {
                        "totalsize": totalsize,
                }).Debugf("DiskCache: remaining cache usage after deleting")
        }
+       atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+       atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+       cache.lastFileCount = int64(len(ents) - deleted)
 }