20318: Add config entry for keep-web cache size.
[arvados.git] / sdk / go / arvados / keep_cache.go
index 6aed35a215c6d22691c0c547536710c2630b85dd..a6571538761ef2411047c5ce97fd42122a93e6e6 100644 (file)
@@ -37,7 +37,7 @@ type KeepGateway interface {
 type DiskCache struct {
        KeepGateway
        Dir     string
-       MaxSize int64
+       MaxSize ByteSizeOrPercent
        Logger  logrus.FieldLogger
 
        tidying        int32 // see tidy()
@@ -50,6 +50,24 @@ type DiskCache struct {
        heldopen     map[string]*openFileEnt
        heldopenMax  int
        heldopenLock sync.Mutex
+
+       // The "writing" fields allow multiple concurrent/sequential
+       // ReadAt calls to be notified as a single
+       // read-block-from-backend-into-cache goroutine fills the
+       // cache file.
+       writing     map[string]*writeprogress
+       writingCond *sync.Cond
+       writingLock sync.Mutex
+
+       sizeMeasured  int64 // actual size on disk after last tidy(); zero if not measured yet
+       sizeEstimated int64 // last measured size, plus files we have written since
+}
+
+type writeprogress struct {
+       cond *sync.Cond // broadcast whenever size or done changes
+       done bool       // size and err have their final values
+       size int        // bytes copied into cache file so far
+       err  error      // error encountered while copying from backend to cache
 }
 
 type openFileEnt struct {
@@ -61,7 +79,7 @@ type openFileEnt struct {
 const (
        cacheFileSuffix  = ".keepcacheblock"
        tmpFileSuffix    = ".tmp"
-       tidyHoldDuration = 10 * time.Second
+       tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
 )
 
 func (cache *DiskCache) cacheFile(locator string) string {
@@ -108,7 +126,6 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
-       cache.gotidy()
        unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
        tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
        tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
@@ -172,6 +189,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
                if err != nil {
                        cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
                }
+               atomic.AddInt64(&cache.sizeEstimated, int64(n))
+               cache.gotidy()
        }()
 
        // Write through to the wrapped KeepGateway from the pipe,
@@ -202,60 +221,117 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
        return resp, err
 }
 
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+       return fw(p)
+}
+
 // ReadAt reads the entire block from the wrapped KeepGateway into the
 // cache if needed, and copies the requested portion into the provided
 // slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-       cache.gotidy()
        cachefilename := cache.cacheFile(locator)
        if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
                return n, err
        }
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+       readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
        if err != nil {
-               cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
-               return cache.KeepGateway.ReadAt(locator, dst, offset)
+               return 0, fmt.Errorf("ReadAt: %w", err)
        }
-       defer f.Close()
+       defer readf.Close()
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+       err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
        if err != nil {
                return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
        }
 
-       size, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file seems to be truncated or empty
-               // (possibly because we just created it). Wait for an
-               // exclusive lock, then check again (in case someone
-               // else is doing the same thing) before trying to
-               // retrieve the entire block.
-               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-               if err != nil {
-                       return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
-               }
-       }
-       size, err = f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file is truncated or empty, and we own it
-               // now. Fill it.
-               _, err = f.Seek(0, io.SeekStart)
-               if err != nil {
-                       return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
-               }
-               n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
-               if err != nil {
-                       return 0, err
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       if progress != nil {
+               cache.writingLock.Unlock()
+       } else {
+               progress = &writeprogress{}
+               progress.cond = sync.NewCond(&sync.Mutex{})
+               if cache.writing == nil {
+                       cache.writing = map[string]*writeprogress{}
                }
-               f.Truncate(int64(n))
+               cache.writing[cachefilename] = progress
+               cache.writingLock.Unlock()
+
+               // Start a goroutine to copy from backend to f. As
+               // data arrives, wake up any waiting loops (see below)
+               // so ReadAt() requests for partial data can return as
+               // soon as the relevant bytes have been copied.
+               go func() {
+                       var size int
+                       var writef *os.File
+                       var err error
+                       defer func() {
+                               closeErr := writef.Close()
+                               if err == nil {
+                                       err = closeErr
+                               }
+                               progress.cond.L.Lock()
+                               progress.err = err
+                               progress.done = true
+                               progress.size = size
+                               progress.cond.L.Unlock()
+                               progress.cond.Broadcast()
+                               cache.writingLock.Lock()
+                               delete(cache.writing, cachefilename)
+                               cache.writingLock.Unlock()
+                       }()
+                       writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+                       if err != nil {
+                               err = fmt.Errorf("ReadAt: %w", err)
+                               return
+                       }
+                       err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+                       if err != nil {
+                               err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+                               return
+                       }
+                       size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+                               Locator: locator,
+                               WriteTo: funcwriter(func(p []byte) (int, error) {
+                                       n, err := writef.Write(p)
+                                       if n > 0 {
+                                               progress.cond.L.Lock()
+                                               progress.size += n
+                                               progress.cond.L.Unlock()
+                                               progress.cond.Broadcast()
+                                       }
+                                       return n, err
+                               })})
+                       atomic.AddInt64(&cache.sizeEstimated, int64(size))
+                       cache.gotidy()
+               }()
+       }
+       progress.cond.L.Lock()
+       for !progress.done && progress.size < len(dst)+offset {
+               progress.cond.Wait()
+       }
+       ok := progress.size >= len(dst)+offset
+       err = progress.err
+       progress.cond.L.Unlock()
+
+       if !ok && err != nil {
+               // If the copy-from-backend goroutine encountered an
+               // error before copying enough bytes to satisfy our
+               // request, we return that error.
+               return 0, err
+       } else {
+               // Regardless of whether the copy-from-backend
+               // goroutine succeeded, or failed after copying the
+               // bytes we need, the only errors we need to report
+               // are errors reading from the cache file.
+               return readf.ReadAt(dst, int64(offset))
        }
-       return f.ReadAt(dst, int64(offset))
 }
 
 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
@@ -375,6 +451,26 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
                // Other goroutine closed the file before we got RLock
                return 0, quickReadAtLostRace
        }
+
+       // If another goroutine is currently writing the file, wait
+       // for it to catch up to the end of the range we need.
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       cache.writingLock.Unlock()
+       if progress != nil {
+               progress.cond.L.Lock()
+               for !progress.done && progress.size < len(dst)+offset {
+                       progress.cond.Wait()
+               }
+               progress.cond.L.Unlock()
+               // If size<needed && progress.err!=nil here, we'll end
+               // up reporting a less helpful "EOF reading from cache
+               // file" below, instead of the actual error fetching
+               // from upstream to cache file.  This is OK though,
+               // because our caller (ReadAt) doesn't even report our
+               // error, it just retries.
+       }
+
        n, err := heldopen.f.ReadAt(dst, int64(offset))
        if err != nil {
                // wait for any concurrent users to finish, then
@@ -385,18 +481,8 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
        return n, err
 }
 
-// BlockRead reads the entire block from the wrapped KeepGateway into
-// the cache if needed, and writes it to the provided writer.
+// BlockRead reads an entire block using a 128 KiB buffer.
 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
-       cache.gotidy()
-       cachefilename := cache.cacheFile(opts.Locator)
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
-       if err != nil {
-               cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
-               return cache.KeepGateway.BlockRead(ctx, opts)
-       }
-       defer f.Close()
-
        i := strings.Index(opts.Locator, "+")
        if i < 0 || i >= len(opts.Locator) {
                return 0, errors.New("invalid block locator: no size hint")
@@ -411,41 +497,45 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
                return 0, errors.New("invalid block locator: invalid size hint")
        }
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
-       if err != nil {
-               return 0, err
-       }
-       filesize, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, err
-       }
-       _, err = f.Seek(0, io.SeekStart)
-       if err != nil {
-               return 0, err
-       }
-       if filesize == blocksize {
-               n, err := io.Copy(opts.WriteTo, f)
-               return int(n), err
-       }
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-       if err != nil {
-               return 0, err
-       }
-       opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
-       n, err := cache.KeepGateway.BlockRead(ctx, opts)
-       if err != nil {
-               return int(n), err
+       offset := 0
+       buf := make([]byte, 131072)
+       for offset < int(blocksize) {
+               if ctx.Err() != nil {
+                       return offset, ctx.Err()
+               }
+               if int(blocksize)-offset > len(buf) {
+                       buf = buf[:int(blocksize)-offset]
+               }
+               nr, err := cache.ReadAt(opts.Locator, buf, offset)
+               if nr > 0 {
+                       nw, err := opts.WriteTo.Write(buf)
+                       if err != nil {
+                               return offset + nw, err
+                       }
+               }
+               offset += nr
+               if err != nil {
+                       return offset, err
+               }
        }
-       f.Truncate(int64(n))
-       return n, nil
+       return offset, nil
 }
 
 // Start a tidy() goroutine, unless one is already running / recently
 // finished.
 func (cache *DiskCache) gotidy() {
-       // Return quickly if another tidy goroutine is running in this process.
+       // Skip if another tidy goroutine is running in this process.
        n := atomic.AddInt32(&cache.tidying, 1)
-       if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+       if n != 1 {
+               atomic.AddInt32(&cache.tidying, -1)
+               return
+       }
+       // Skip if sizeEstimated is based on an actual measurement and
+       // is below MaxSize, and we haven't reached the "recheck
+       // anyway" time threshold.
+       if cache.sizeMeasured > 0 &&
+               atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
+               time.Now().Before(cache.tidyHoldUntil) {
                atomic.AddInt32(&cache.tidying, -1)
                return
        }
@@ -458,14 +548,26 @@ func (cache *DiskCache) gotidy() {
 
 // Delete cache files as needed to control disk usage.
 func (cache *DiskCache) tidy() {
-       maxsize := cache.MaxSize
+       maxsize := int64(cache.MaxSize.ByteSize())
        if maxsize < 1 {
-               if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+               maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
+               if maxsize == 0 {
+                       // defaultMaxSize not yet computed. Use 10% of
+                       // filesystem capacity (or different
+                       // percentage if indicated by cache.MaxSize)
+                       pct := cache.MaxSize.Percent()
+                       if pct == 0 {
+                               pct = 10
+                       }
                        var stat unix.Statfs_t
                        if nil == unix.Statfs(cache.Dir, &stat) {
-                               maxsize = int64(stat.Bavail) * stat.Bsize / 10
+                               maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
+                               atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+                       } else {
+                               // In this case we will set
+                               // defaultMaxSize below after
+                               // measuring current usage.
                        }
-                       atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
                }
        }
 
@@ -521,7 +623,8 @@ func (cache *DiskCache) tidy() {
 
        // If MaxSize wasn't specified and we failed to come up with a
        // defaultSize above, use the larger of {current cache size, 1
-       // GiB} as the defaultSize for subsequent tidy() operations.
+       // GiB} as the defaultMaxSize for subsequent tidy()
+       // operations.
        if maxsize == 0 {
                if totalsize < 1<<30 {
                        atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
@@ -532,11 +635,26 @@ func (cache *DiskCache) tidy() {
                return
        }
 
-       if totalsize <= maxsize {
+       // If we're below MaxSize or there's only one block in the
+       // cache, just update the usage estimate and return.
+       //
+       // (We never delete the last block because that would merely
+       // cause the same block to get re-fetched repeatedly from the
+       // backend.)
+       if totalsize <= maxsize || len(ents) == 1 {
+               atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+               atomic.StoreInt64(&cache.sizeEstimated, totalsize)
                return
        }
 
-       // Delete oldest entries until totalsize < maxsize.
+       // Set a new size target of maxsize minus 5%.  This makes some
+       // room for sizeEstimate to grow before it triggers another
+       // tidy. We don't want to walk/sort an entire large cache
+       // directory each time we write a block.
+       target := maxsize - (maxsize / 20)
+
+       // Delete oldest entries until totalsize < target or we're
+       // down to a single cached block.
        sort.Slice(ents, func(i, j int) bool {
                return ents[i].atime.Before(ents[j].atime)
        })
@@ -546,7 +664,7 @@ func (cache *DiskCache) tidy() {
                go cache.deleteHeldopen(ent.path, nil)
                deleted++
                totalsize -= ent.size
-               if totalsize <= maxsize {
+               if totalsize <= target || deleted == len(ents)-1 {
                        break
                }
        }
@@ -557,4 +675,6 @@ func (cache *DiskCache) tidy() {
                        "totalsize": totalsize,
                }).Debugf("DiskCache: remaining cache usage after deleting")
        }
+       atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+       atomic.StoreInt64(&cache.sizeEstimated, totalsize)
 }