20318: Sync cache state after 1% churn instead of 5 minute timer.
[arvados.git] / sdk / go / arvados / keep_cache.go
index eb9d4607bf198248e74b764b3306fc95cdb0c690..b6b2a9da669f49ed292ddbdf4b412958333857c4 100644 (file)
@@ -37,11 +37,30 @@ type KeepGateway interface {
 type DiskCache struct {
        KeepGateway
        Dir     string
-       MaxSize int64
+       MaxSize ByteSizeOrPercent
        Logger  logrus.FieldLogger
 
+       *sharedCache
+       setupOnce sync.Once
+}
+
+var (
+       sharedCachesLock sync.Mutex
+       sharedCaches     = map[string]*sharedCache{}
+)
+
+// sharedCache has fields that coordinate the cache usage in a single
+// cache directory; it can be shared by multiple DiskCaches.
+//
+// This serves to share a single pool of held-open filehandles, a
+// single tidying goroutine, etc., even when the program (like
+// keep-web) uses multiple KeepGateway stacks that use different auth
+// tokens, etc.
+type sharedCache struct {
+       dir     string
+       maxSize ByteSizeOrPercent
+
        tidying        int32 // see tidy()
-       tidyHoldUntil  time.Time
        defaultMaxSize int64
 
        // The "heldopen" fields are used to open cache files for
@@ -50,6 +69,26 @@ type DiskCache struct {
        heldopen     map[string]*openFileEnt
        heldopenMax  int
        heldopenLock sync.Mutex
+
+       // The "writing" fields allow multiple concurrent/sequential
+       // ReadAt calls to be notified as a single
+       // read-block-from-backend-into-cache goroutine fills the
+       // cache file.
+       writing     map[string]*writeprogress
+       writingCond *sync.Cond
+       writingLock sync.Mutex
+
+       sizeMeasured    int64 // actual size on disk after last tidy(); zero if not measured yet
+       sizeEstimated   int64 // last measured size, plus files we have written since
+       lastFileCount   int64 // number of files on disk at last count
+       writesSinceTidy int64 // number of files written since last tidy()
+}
+
+type writeprogress struct {
+       cond *sync.Cond // broadcast whenever size or done changes
+       done bool       // size and err have their final values
+       size int        // bytes copied into cache file so far
+       err  error      // error encountered while copying from backend to cache
 }
 
 type openFileEnt struct {
@@ -59,17 +98,26 @@ type openFileEnt struct {
 }
 
 const (
-       cacheFileSuffix  = ".keepcacheblock"
-       tmpFileSuffix    = ".tmp"
-       tidyHoldDuration = 10 * time.Second
+       cacheFileSuffix = ".keepcacheblock"
+       tmpFileSuffix   = ".tmp"
 )
 
+func (cache *DiskCache) setup() {
+       sharedCachesLock.Lock()
+       defer sharedCachesLock.Unlock()
+       dir := cache.Dir
+       if sharedCaches[dir] == nil {
+               sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
+       }
+       cache.sharedCache = sharedCaches[dir]
+}
+
 func (cache *DiskCache) cacheFile(locator string) string {
        hash := locator
        if i := strings.Index(hash, "+"); i > 0 {
                hash = hash[:i]
        }
-       return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+       return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
 }
 
 // Open a cache file, creating the parent dir if necessary.
@@ -108,9 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
-       cache.gotidy()
+       cache.setupOnce.Do(cache.setup)
        unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
-       tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+       tmpfilename := filepath.Join(cache.dir, "tmp", unique)
        tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
        if err != nil {
                cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
@@ -172,6 +220,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
                if err != nil {
                        cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
                }
+               atomic.AddInt64(&cache.sizeEstimated, int64(n))
+               cache.gotidy()
        }()
 
        // Write through to the wrapped KeepGateway from the pipe,
@@ -202,70 +252,149 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
        return resp, err
 }
 
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+       return fw(p)
+}
+
 // ReadAt reads the entire block from the wrapped KeepGateway into the
 // cache if needed, and copies the requested portion into the provided
 // slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-       cache.gotidy()
+       cache.setupOnce.Do(cache.setup)
        cachefilename := cache.cacheFile(locator)
        if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
                return n, err
        }
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+       readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
        if err != nil {
-               cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
-               return cache.KeepGateway.ReadAt(locator, dst, offset)
+               return 0, fmt.Errorf("ReadAt: %w", err)
        }
-       defer f.Close()
+       defer readf.Close()
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+       err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
        if err != nil {
                return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
        }
 
-       size, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file seems to be truncated or empty
-               // (possibly because we just created it). Wait for an
-               // exclusive lock, then check again (in case someone
-               // else is doing the same thing) before trying to
-               // retrieve the entire block.
-               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-               if err != nil {
-                       return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       if progress != nil {
+               cache.writingLock.Unlock()
+       } else {
+               progress = &writeprogress{}
+               progress.cond = sync.NewCond(&sync.Mutex{})
+               if cache.writing == nil {
+                       cache.writing = map[string]*writeprogress{}
                }
+               cache.writing[cachefilename] = progress
+               cache.writingLock.Unlock()
+
+               // Start a goroutine to copy from backend to f. As
+               // data arrives, wake up any waiting loops (see below)
+               // so ReadAt() requests for partial data can return as
+               // soon as the relevant bytes have been copied.
+               go func() {
+                       var size int
+                       var writef *os.File
+                       var err error
+                       defer func() {
+                               closeErr := writef.Close()
+                               if err == nil {
+                                       err = closeErr
+                               }
+                               progress.cond.L.Lock()
+                               progress.err = err
+                               progress.done = true
+                               progress.size = size
+                               progress.cond.L.Unlock()
+                               progress.cond.Broadcast()
+                               cache.writingLock.Lock()
+                               delete(cache.writing, cachefilename)
+                               cache.writingLock.Unlock()
+                       }()
+                       writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+                       if err != nil {
+                               err = fmt.Errorf("ReadAt: %w", err)
+                               return
+                       }
+                       err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+                       if err != nil {
+                               err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+                               return
+                       }
+                       size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+                               Locator: locator,
+                               WriteTo: funcwriter(func(p []byte) (int, error) {
+                                       n, err := writef.Write(p)
+                                       if n > 0 {
+                                               progress.cond.L.Lock()
+                                               progress.size += n
+                                               progress.cond.L.Unlock()
+                                               progress.cond.Broadcast()
+                                       }
+                                       return n, err
+                               })})
+                       atomic.AddInt64(&cache.sizeEstimated, int64(size))
+                       cache.gotidy()
+               }()
+       }
+       progress.cond.L.Lock()
+       for !progress.done && progress.size < len(dst)+offset {
+               progress.cond.Wait()
+       }
+       ok := progress.size >= len(dst)+offset
+       err = progress.err
+       progress.cond.L.Unlock()
+
+       if !ok && err != nil {
+               // If the copy-from-backend goroutine encountered an
+               // error before copying enough bytes to satisfy our
+               // request, we return that error.
+               return 0, err
+       } else {
+               // Regardless of whether the copy-from-backend
+               // goroutine succeeded, or failed after copying the
+               // bytes we need, the only errors we need to report
+               // are errors reading from the cache file.
+               return readf.ReadAt(dst, int64(offset))
        }
-       size, err = f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file is truncated or empty, and we own it
-               // now. Fill it.
-               _, err = f.Seek(0, io.SeekStart)
-               if err != nil {
-                       return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
-               }
-               n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
-               if err != nil {
-                       return 0, err
-               }
-               f.Truncate(int64(n))
-       }
-       return f.ReadAt(dst, int64(offset))
 }
 
 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
 
-func (cache *DiskCache) deleteHeldopen(cachefilename string, heldopen *openFileEnt) {
+// Remove the cache entry for the indicated cachefilename if it
+// matches expect (quickReadAt() usage), or if expect is nil (tidy()
+// usage).
+//
+// If expect is non-nil, close expect's filehandle.
+//
+// If expect is nil and a different cache entry is deleted, close its
+// filehandle.
+func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
+       needclose := expect
+
        cache.heldopenLock.Lock()
-       if cache.heldopen[cachefilename] == heldopen {
+       found := cache.heldopen[cachefilename]
+       if found != nil && (expect == nil || expect == found) {
                delete(cache.heldopen, cachefilename)
+               needclose = found
        }
        cache.heldopenLock.Unlock()
+
+       if needclose != nil {
+               needclose.Lock()
+               defer needclose.Unlock()
+               if needclose.f != nil {
+                       needclose.f.Close()
+                       needclose.f = nil
+               }
+       }
 }
 
 // quickReadAt attempts to use a cached-filehandle approach to read
@@ -285,11 +414,11 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
                lim := syscall.Rlimit{}
                err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
                if err != nil {
-                       cache.heldopenMax = 256
-               } else if lim.Cur > 40000 {
+                       cache.heldopenMax = 100
+               } else if lim.Cur > 400000 {
                        cache.heldopenMax = 10000
                } else {
-                       cache.heldopenMax = int(lim.Cur / 4)
+                       cache.heldopenMax = int(lim.Cur / 40)
                }
        }
        heldopen := cache.heldopen[cachefilename]
@@ -354,6 +483,26 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
                // Other goroutine closed the file before we got RLock
                return 0, quickReadAtLostRace
        }
+
+       // If another goroutine is currently writing the file, wait
+       // for it to catch up to the end of the range we need.
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       cache.writingLock.Unlock()
+       if progress != nil {
+               progress.cond.L.Lock()
+               for !progress.done && progress.size < len(dst)+offset {
+                       progress.cond.Wait()
+               }
+               progress.cond.L.Unlock()
+               // If size<needed && progress.err!=nil here, we'll end
+               // up reporting a less helpful "EOF reading from cache
+               // file" below, instead of the actual error fetching
+               // from upstream to cache file.  This is OK though,
+               // because our caller (ReadAt) doesn't even report our
+               // error, it just retries.
+       }
+
        n, err := heldopen.f.ReadAt(dst, int64(offset))
        if err != nil {
                // wait for any concurrent users to finish, then
@@ -364,18 +513,9 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
        return n, err
 }
 
-// BlockRead reads the entire block from the wrapped KeepGateway into
-// the cache if needed, and writes it to the provided writer.
+// BlockRead reads an entire block using a 128 KiB buffer.
 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
-       cache.gotidy()
-       cachefilename := cache.cacheFile(opts.Locator)
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
-       if err != nil {
-               cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
-               return cache.KeepGateway.BlockRead(ctx, opts)
-       }
-       defer f.Close()
-
+       cache.setupOnce.Do(cache.setup)
        i := strings.Index(opts.Locator, "+")
        if i < 0 || i >= len(opts.Locator) {
                return 0, errors.New("invalid block locator: no size hint")
@@ -390,66 +530,84 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
                return 0, errors.New("invalid block locator: invalid size hint")
        }
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
-       if err != nil {
-               return 0, err
-       }
-       filesize, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, err
-       }
-       _, err = f.Seek(0, io.SeekStart)
-       if err != nil {
-               return 0, err
-       }
-       if filesize == blocksize {
-               n, err := io.Copy(opts.WriteTo, f)
-               return int(n), err
-       }
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-       if err != nil {
-               return 0, err
-       }
-       opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
-       n, err := cache.KeepGateway.BlockRead(ctx, opts)
-       if err != nil {
-               return int(n), err
+       offset := 0
+       buf := make([]byte, 131072)
+       for offset < int(blocksize) {
+               if ctx.Err() != nil {
+                       return offset, ctx.Err()
+               }
+               if int(blocksize)-offset > len(buf) {
+                       buf = buf[:int(blocksize)-offset]
+               }
+               nr, err := cache.ReadAt(opts.Locator, buf, offset)
+               if nr > 0 {
+                       nw, err := opts.WriteTo.Write(buf)
+                       if err != nil {
+                               return offset + nw, err
+                       }
+               }
+               offset += nr
+               if err != nil {
+                       return offset, err
+               }
        }
-       f.Truncate(int64(n))
-       return n, nil
+       return offset, nil
 }
 
 // Start a tidy() goroutine, unless one is already running / recently
 // finished.
 func (cache *DiskCache) gotidy() {
-       // Return quickly if another tidy goroutine is running in this process.
+       writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
+       // Skip if another tidy goroutine is running in this process.
        n := atomic.AddInt32(&cache.tidying, 1)
-       if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+       if n != 1 {
+               atomic.AddInt32(&cache.tidying, -1)
+               return
+       }
+       // Skip if sizeEstimated is based on an actual measurement and
+       // is below maxSize, and we haven't done very many writes
+       // since last tidy (defined as 1% of number of cache files at
+       // last count).
+       if cache.sizeMeasured > 0 &&
+               atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
+               writes < cache.lastFileCount/100 {
                atomic.AddInt32(&cache.tidying, -1)
                return
        }
        go func() {
                cache.tidy()
-               cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
+               atomic.StoreInt64(&cache.writesSinceTidy, 0)
                atomic.AddInt32(&cache.tidying, -1)
        }()
 }
 
 // Delete cache files as needed to control disk usage.
 func (cache *DiskCache) tidy() {
-       maxsize := cache.MaxSize
+       maxsize := int64(cache.maxSize.ByteSize())
        if maxsize < 1 {
-               if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+               maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
+               if maxsize == 0 {
+                       // defaultMaxSize not yet computed. Use 10% of
+                       // filesystem capacity (or different
+                       // percentage if indicated by cache.maxSize)
+                       pct := cache.maxSize.Percent()
+                       if pct == 0 {
+                               pct = 10
+                       }
                        var stat unix.Statfs_t
-                       if nil == unix.Statfs(cache.Dir, &stat) {
-                               maxsize = int64(stat.Bavail) * stat.Bsize / 10
+                       if nil == unix.Statfs(cache.dir, &stat) {
+                               maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
+                               atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+                       } else {
+                               // In this case we will set
+                               // defaultMaxSize below after
+                               // measuring current usage.
                        }
-                       atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
                }
        }
 
        // Bail if a tidy goroutine is running in a different process.
-       lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+       lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
        if err != nil {
                return
        }
@@ -466,7 +624,7 @@ func (cache *DiskCache) tidy() {
        }
        var ents []entT
        var totalsize int64
-       filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+       filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
                if err != nil {
                        cache.debugf("tidy: skipping dir %s: %s", path, err)
                        return nil
@@ -500,7 +658,8 @@ func (cache *DiskCache) tidy() {
 
        // If MaxSize wasn't specified and we failed to come up with a
        // defaultSize above, use the larger of {current cache size, 1
-       // GiB} as the defaultSize for subsequent tidy() operations.
+       // GiB} as the defaultMaxSize for subsequent tidy()
+       // operations.
        if maxsize == 0 {
                if totalsize < 1<<30 {
                        atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
@@ -511,20 +670,37 @@ func (cache *DiskCache) tidy() {
                return
        }
 
-       if totalsize <= maxsize {
+       // If we're below MaxSize or there's only one block in the
+       // cache, just update the usage estimate and return.
+       //
+       // (We never delete the last block because that would merely
+       // cause the same block to get re-fetched repeatedly from the
+       // backend.)
+       if totalsize <= maxsize || len(ents) == 1 {
+               atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+               atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+               cache.lastFileCount = int64(len(ents))
                return
        }
 
-       // Delete oldest entries until totalsize < maxsize.
+       // Set a new size target of maxsize minus 5%.  This makes some
+       // room for sizeEstimate to grow before it triggers another
+       // tidy. We don't want to walk/sort an entire large cache
+       // directory each time we write a block.
+       target := maxsize - (maxsize / 20)
+
+       // Delete oldest entries until totalsize < target or we're
+       // down to a single cached block.
        sort.Slice(ents, func(i, j int) bool {
                return ents[i].atime.Before(ents[j].atime)
        })
        deleted := 0
        for _, ent := range ents {
                os.Remove(ent.path)
+               go cache.deleteHeldopen(ent.path, nil)
                deleted++
                totalsize -= ent.size
-               if totalsize <= maxsize {
+               if totalsize <= target || deleted == len(ents)-1 {
                        break
                }
        }
@@ -535,4 +711,7 @@ func (cache *DiskCache) tidy() {
                        "totalsize": totalsize,
                }).Debugf("DiskCache: remaining cache usage after deleting")
        }
+       atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+       atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+       cache.lastFileCount = int64(len(ents) - deleted)
 }