type DiskCache struct {
KeepGateway
Dir string
- MaxSize int64
+ MaxSize ByteSizeOrPercent
Logger logrus.FieldLogger
+ *sharedCache
+ setupOnce sync.Once
+}
+
+var (
+ sharedCachesLock sync.Mutex
+ sharedCaches = map[string]*sharedCache{}
+)
+
+// sharedCache has fields that coordinate the cache usage in a single
+// cache directory; it can be shared by multiple DiskCaches.
+//
+// This serves to share a single pool of held-open filehandles, a
+// single tidying goroutine, etc., even when the program (like
+// keep-web) uses multiple KeepGateway stacks that use different auth
+// tokens, etc.
+type sharedCache struct {
+ dir string
+ maxSize ByteSizeOrPercent
+
tidying int32 // see tidy()
- tidyHoldUntil time.Time
defaultMaxSize int64
// The "heldopen" fields are used to open cache files for
heldopen map[string]*openFileEnt
heldopenMax int
heldopenLock sync.Mutex
+
+ // The "writing" fields allow multiple concurrent/sequential
+ // ReadAt calls to be notified as a single
+ // read-block-from-backend-into-cache goroutine fills the
+ // cache file.
+ writing map[string]*writeprogress
+ writingCond *sync.Cond
+ writingLock sync.Mutex
+
+ sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
+ sizeEstimated int64 // last measured size, plus files we have written since
+ lastFileCount int64 // number of files on disk at last count
+ writesSinceTidy int64 // number of files written since last tidy()
+}
+
+type writeprogress struct {
+ cond *sync.Cond // broadcast whenever size or done changes
+ done bool // size and err have their final values
+ size int // bytes copied into cache file so far
+ err error // error encountered while copying from backend to cache
}
type openFileEnt struct {
}
const (
- cacheFileSuffix = ".keepcacheblock"
- tmpFileSuffix = ".tmp"
- tidyHoldDuration = 10 * time.Second
+ cacheFileSuffix = ".keepcacheblock"
+ tmpFileSuffix = ".tmp"
)
+func (cache *DiskCache) setup() {
+ sharedCachesLock.Lock()
+ defer sharedCachesLock.Unlock()
+ dir := cache.Dir
+ if sharedCaches[dir] == nil {
+ sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
+ }
+ cache.sharedCache = sharedCaches[dir]
+}
+
func (cache *DiskCache) cacheFile(locator string) string {
hash := locator
if i := strings.Index(hash, "+"); i > 0 {
hash = hash[:i]
}
- return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+ return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
}
// Open a cache file, creating the parent dir if necessary.
// BlockWrite writes through to the wrapped KeepGateway, and (if
// possible) retains a copy of the written block in the cache.
func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
- cache.gotidy()
+ cache.setupOnce.Do(cache.setup)
unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
- tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+ tmpfilename := filepath.Join(cache.dir, "tmp", unique)
tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
if err != nil {
cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
if err != nil {
cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
}
+ atomic.AddInt64(&cache.sizeEstimated, int64(n))
+ cache.gotidy()
}()
// Write through to the wrapped KeepGateway from the pipe,
return resp, err
}
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+ return fw(p)
+}
+
// ReadAt reads the entire block from the wrapped KeepGateway into the
// cache if needed, and copies the requested portion into the provided
// slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
- cache.gotidy()
+ cache.setupOnce.Do(cache.setup)
cachefilename := cache.cacheFile(locator)
if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
return n, err
}
- f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+ readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
if err != nil {
- cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
- return cache.KeepGateway.ReadAt(locator, dst, offset)
+ return 0, fmt.Errorf("ReadAt: %w", err)
}
- defer f.Close()
+ defer readf.Close()
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+ err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
if err != nil {
return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
}
- size, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
- }
- if size < int64(len(dst)+offset) {
- // The cache file seems to be truncated or empty
- // (possibly because we just created it). Wait for an
- // exclusive lock, then check again (in case someone
- // else is doing the same thing) before trying to
- // retrieve the entire block.
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
- if err != nil {
- return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ if progress != nil {
+ cache.writingLock.Unlock()
+ } else {
+ progress = &writeprogress{}
+ progress.cond = sync.NewCond(&sync.Mutex{})
+ if cache.writing == nil {
+ cache.writing = map[string]*writeprogress{}
}
+ cache.writing[cachefilename] = progress
+ cache.writingLock.Unlock()
+
+ // Start a goroutine to copy from backend to f. As
+ // data arrives, wake up any waiting loops (see below)
+ // so ReadAt() requests for partial data can return as
+ // soon as the relevant bytes have been copied.
+ go func() {
+ var size int
+ var writef *os.File
+ var err error
+ defer func() {
+ closeErr := writef.Close()
+ if err == nil {
+ err = closeErr
+ }
+ progress.cond.L.Lock()
+ progress.err = err
+ progress.done = true
+ progress.size = size
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ cache.writingLock.Lock()
+ delete(cache.writing, cachefilename)
+ cache.writingLock.Unlock()
+ }()
+ writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+ if err != nil {
+ err = fmt.Errorf("ReadAt: %w", err)
+ return
+ }
+ err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+ return
+ }
+ size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+ Locator: locator,
+ WriteTo: funcwriter(func(p []byte) (int, error) {
+ n, err := writef.Write(p)
+ if n > 0 {
+ progress.cond.L.Lock()
+ progress.size += n
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ }
+ return n, err
+ })})
+ atomic.AddInt64(&cache.sizeEstimated, int64(size))
+ cache.gotidy()
+ }()
+ }
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ ok := progress.size >= len(dst)+offset
+ err = progress.err
+ progress.cond.L.Unlock()
+
+ if !ok && err != nil {
+ // If the copy-from-backend goroutine encountered an
+ // error before copying enough bytes to satisfy our
+ // request, we return that error.
+ return 0, err
+ } else {
+ // Regardless of whether the copy-from-backend
+ // goroutine succeeded, or failed after copying the
+ // bytes we need, the only errors we need to report
+ // are errors reading from the cache file.
+ return readf.ReadAt(dst, int64(offset))
}
- size, err = f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
- }
- if size < int64(len(dst)+offset) {
- // The cache file is truncated or empty, and we own it
- // now. Fill it.
- _, err = f.Seek(0, io.SeekStart)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
- }
- n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
- if err != nil {
- return 0, err
- }
- f.Truncate(int64(n))
- }
- return f.ReadAt(dst, int64(offset))
}
var quickReadAtLostRace = errors.New("quickReadAt: lost race")
-func (cache *DiskCache) deleteHeldopen(cachefilename string, heldopen *openFileEnt) {
+// Remove the cache entry for the indicated cachefilename if it
+// matches expect (quickReadAt() usage), or if expect is nil (tidy()
+// usage).
+//
+// If expect is non-nil, close expect's filehandle.
+//
+// If expect is nil and a different cache entry is deleted, close its
+// filehandle.
+func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
+ needclose := expect
+
cache.heldopenLock.Lock()
- if cache.heldopen[cachefilename] == heldopen {
+ found := cache.heldopen[cachefilename]
+ if found != nil && (expect == nil || expect == found) {
delete(cache.heldopen, cachefilename)
+ needclose = found
}
cache.heldopenLock.Unlock()
+
+ if needclose != nil {
+ needclose.Lock()
+ defer needclose.Unlock()
+ if needclose.f != nil {
+ needclose.f.Close()
+ needclose.f = nil
+ }
+ }
}
// quickReadAt attempts to use a cached-filehandle approach to read
lim := syscall.Rlimit{}
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
if err != nil {
- cache.heldopenMax = 256
- } else if lim.Cur > 40000 {
+ cache.heldopenMax = 100
+ } else if lim.Cur > 400000 {
cache.heldopenMax = 10000
} else {
- cache.heldopenMax = int(lim.Cur / 4)
+ cache.heldopenMax = int(lim.Cur / 40)
}
}
heldopen := cache.heldopen[cachefilename]
// Other goroutine closed the file before we got RLock
return 0, quickReadAtLostRace
}
+
+ // If another goroutine is currently writing the file, wait
+ // for it to catch up to the end of the range we need.
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ cache.writingLock.Unlock()
+ if progress != nil {
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ progress.cond.L.Unlock()
+ // If size<needed && progress.err!=nil here, we'll end
+ // up reporting a less helpful "EOF reading from cache
+ // file" below, instead of the actual error fetching
+ // from upstream to cache file. This is OK though,
+ // because our caller (ReadAt) doesn't even report our
+ // error, it just retries.
+ }
+
n, err := heldopen.f.ReadAt(dst, int64(offset))
if err != nil {
// wait for any concurrent users to finish, then
return n, err
}
-// BlockRead reads the entire block from the wrapped KeepGateway into
-// the cache if needed, and writes it to the provided writer.
+// BlockRead reads an entire block using a 128 KiB buffer.
func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
- cache.gotidy()
- cachefilename := cache.cacheFile(opts.Locator)
- f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
- if err != nil {
- cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
- return cache.KeepGateway.BlockRead(ctx, opts)
- }
- defer f.Close()
-
+ cache.setupOnce.Do(cache.setup)
i := strings.Index(opts.Locator, "+")
if i < 0 || i >= len(opts.Locator) {
return 0, errors.New("invalid block locator: no size hint")
return 0, errors.New("invalid block locator: invalid size hint")
}
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
- if err != nil {
- return 0, err
- }
- filesize, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, err
- }
- _, err = f.Seek(0, io.SeekStart)
- if err != nil {
- return 0, err
- }
- if filesize == blocksize {
- n, err := io.Copy(opts.WriteTo, f)
- return int(n), err
- }
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
- if err != nil {
- return 0, err
- }
- opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
- n, err := cache.KeepGateway.BlockRead(ctx, opts)
- if err != nil {
- return int(n), err
+ offset := 0
+ buf := make([]byte, 131072)
+ for offset < int(blocksize) {
+ if ctx.Err() != nil {
+ return offset, ctx.Err()
+ }
+ if int(blocksize)-offset > len(buf) {
+ buf = buf[:int(blocksize)-offset]
+ }
+ nr, err := cache.ReadAt(opts.Locator, buf, offset)
+ if nr > 0 {
+ nw, err := opts.WriteTo.Write(buf)
+ if err != nil {
+ return offset + nw, err
+ }
+ }
+ offset += nr
+ if err != nil {
+ return offset, err
+ }
}
- f.Truncate(int64(n))
- return n, nil
+ return offset, nil
}
// Start a tidy() goroutine, unless one is already running / recently
// finished.
func (cache *DiskCache) gotidy() {
- // Return quickly if another tidy goroutine is running in this process.
+ writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
+ // Skip if another tidy goroutine is running in this process.
n := atomic.AddInt32(&cache.tidying, 1)
- if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+ if n != 1 {
+ atomic.AddInt32(&cache.tidying, -1)
+ return
+ }
+ // Skip if sizeEstimated is based on an actual measurement and
+ // is below maxSize, and we haven't done very many writes
+ // since last tidy (defined as 1% of number of cache files at
+ // last count).
+ if cache.sizeMeasured > 0 &&
+ atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
+ writes < cache.lastFileCount/100 {
atomic.AddInt32(&cache.tidying, -1)
return
}
go func() {
cache.tidy()
- cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
+ atomic.StoreInt64(&cache.writesSinceTidy, 0)
atomic.AddInt32(&cache.tidying, -1)
}()
}
// Delete cache files as needed to control disk usage.
func (cache *DiskCache) tidy() {
- maxsize := cache.MaxSize
+ maxsize := int64(cache.maxSize.ByteSize())
if maxsize < 1 {
- if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+ maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
+ if maxsize == 0 {
+ // defaultMaxSize not yet computed. Use 10% of
+ // filesystem capacity (or different
+ // percentage if indicated by cache.maxSize)
+ pct := cache.maxSize.Percent()
+ if pct == 0 {
+ pct = 10
+ }
var stat unix.Statfs_t
- if nil == unix.Statfs(cache.Dir, &stat) {
- maxsize = int64(stat.Bavail) * stat.Bsize / 10
+ if nil == unix.Statfs(cache.dir, &stat) {
+ maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
+ atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+ } else {
+ // In this case we will set
+ // defaultMaxSize below after
+ // measuring current usage.
}
- atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
}
}
// Bail if a tidy goroutine is running in a different process.
- lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+ lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
if err != nil {
return
}
}
var ents []entT
var totalsize int64
- filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+ filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
cache.debugf("tidy: skipping dir %s: %s", path, err)
return nil
// If MaxSize wasn't specified and we failed to come up with a
// defaultSize above, use the larger of {current cache size, 1
- // GiB} as the defaultSize for subsequent tidy() operations.
+ // GiB} as the defaultMaxSize for subsequent tidy()
+ // operations.
if maxsize == 0 {
if totalsize < 1<<30 {
atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
return
}
- if totalsize <= maxsize {
+ // If we're below MaxSize or there's only one block in the
+ // cache, just update the usage estimate and return.
+ //
+ // (We never delete the last block because that would merely
+ // cause the same block to get re-fetched repeatedly from the
+ // backend.)
+ if totalsize <= maxsize || len(ents) == 1 {
+ atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+ atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+ cache.lastFileCount = int64(len(ents))
return
}
- // Delete oldest entries until totalsize < maxsize.
+ // Set a new size target of maxsize minus 5%. This makes some
+ // room for sizeEstimate to grow before it triggers another
+ // tidy. We don't want to walk/sort an entire large cache
+ // directory each time we write a block.
+ target := maxsize - (maxsize / 20)
+
+ // Delete oldest entries until totalsize < target or we're
+ // down to a single cached block.
sort.Slice(ents, func(i, j int) bool {
return ents[i].atime.Before(ents[j].atime)
})
deleted := 0
for _, ent := range ents {
os.Remove(ent.path)
+ go cache.deleteHeldopen(ent.path, nil)
deleted++
totalsize -= ent.size
- if totalsize <= maxsize {
+ if totalsize <= target || deleted == len(ents)-1 {
break
}
}
"totalsize": totalsize,
}).Debugf("DiskCache: remaining cache usage after deleting")
}
+ atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+ atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+ cache.lastFileCount = int64(len(ents) - deleted)
}