type DiskCache struct {
KeepGateway
Dir string
- MaxSize int64
+ MaxSize ByteSizeOrPercent
Logger logrus.FieldLogger
+ *sharedCache
+ setupOnce sync.Once
+}
+
+var (
+ sharedCachesLock sync.Mutex
+ sharedCaches = map[string]*sharedCache{}
+)
+
+// sharedCache has fields that coordinate the cache usage in a single
+// cache directory; it can be shared by multiple DiskCaches.
+//
+// This serves to share a single pool of held-open filehandles, a
+// single tidying goroutine, etc., even when the program (like
+// keep-web) uses multiple KeepGateway stacks that use different auth
+// tokens, etc.
+type sharedCache struct {
+ dir string
+ maxSize ByteSizeOrPercent
+
tidying int32 // see tidy()
- tidyHoldUntil time.Time
defaultMaxSize int64
// The "heldopen" fields are used to open cache files for
writingCond *sync.Cond
writingLock sync.Mutex
- sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
- sizeEstimated int64 // last measured size, plus files we have written since
+ sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
+ sizeEstimated int64 // last measured size, plus files we have written since
+ lastFileCount int64 // number of files on disk at last count
+ writesSinceTidy int64 // number of files written since last tidy()
}
type writeprogress struct {
}
const (
- cacheFileSuffix = ".keepcacheblock"
- tmpFileSuffix = ".tmp"
- tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
+ cacheFileSuffix = ".keepcacheblock"
+ tmpFileSuffix = ".tmp"
)
+func (cache *DiskCache) setup() {
+ sharedCachesLock.Lock()
+ defer sharedCachesLock.Unlock()
+ dir := cache.Dir
+ if sharedCaches[dir] == nil {
+ sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
+ }
+ cache.sharedCache = sharedCaches[dir]
+}
+
func (cache *DiskCache) cacheFile(locator string) string {
hash := locator
if i := strings.Index(hash, "+"); i > 0 {
hash = hash[:i]
}
- return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+ return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
}
// Open a cache file, creating the parent dir if necessary.
// BlockWrite writes through to the wrapped KeepGateway, and (if
// possible) retains a copy of the written block in the cache.
func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ cache.setupOnce.Do(cache.setup)
unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
- tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+ tmpfilename := filepath.Join(cache.dir, "tmp", unique)
tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
if err != nil {
cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
// cache. The remainder of the block may continue to be copied into
// the cache in the background.
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ cache.setupOnce.Do(cache.setup)
cachefilename := cache.cacheFile(locator)
if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
return n, err
lim := syscall.Rlimit{}
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
if err != nil {
- cache.heldopenMax = 256
- } else if lim.Cur > 40000 {
+ cache.heldopenMax = 100
+ } else if lim.Cur > 400000 {
cache.heldopenMax = 10000
} else {
- cache.heldopenMax = int(lim.Cur / 4)
+ cache.heldopenMax = int(lim.Cur / 40)
}
}
heldopen := cache.heldopen[cachefilename]
// BlockRead reads an entire block using a 128 KiB buffer.
func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+ cache.setupOnce.Do(cache.setup)
i := strings.Index(opts.Locator, "+")
if i < 0 || i >= len(opts.Locator) {
return 0, errors.New("invalid block locator: no size hint")
// Start a tidy() goroutine, unless one is already running / recently
// finished.
func (cache *DiskCache) gotidy() {
+ writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
// Skip if another tidy goroutine is running in this process.
n := atomic.AddInt32(&cache.tidying, 1)
if n != 1 {
return
}
// Skip if sizeEstimated is based on an actual measurement and
- // is below MaxSize, and we haven't reached the "recheck
- // anyway" time threshold.
+ // is below maxSize, and we haven't done very many writes
+ // since last tidy (defined as 1% of number of cache files at
+ // last count).
if cache.sizeMeasured > 0 &&
- atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
- time.Now().Before(cache.tidyHoldUntil) {
+ atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
+ writes < cache.lastFileCount/100 {
atomic.AddInt32(&cache.tidying, -1)
return
}
go func() {
cache.tidy()
- cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
+ atomic.StoreInt64(&cache.writesSinceTidy, 0)
atomic.AddInt32(&cache.tidying, -1)
}()
}
// Delete cache files as needed to control disk usage.
func (cache *DiskCache) tidy() {
- maxsize := cache.MaxSize
+ maxsize := int64(cache.maxSize.ByteSize())
if maxsize < 1 {
- if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+ maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
+ if maxsize == 0 {
+ // defaultMaxSize not yet computed. Use 10% of
+ // filesystem capacity (or different
+ // percentage if indicated by cache.maxSize)
+ pct := cache.maxSize.Percent()
+ if pct == 0 {
+ pct = 10
+ }
var stat unix.Statfs_t
- if nil == unix.Statfs(cache.Dir, &stat) {
- maxsize = int64(stat.Bavail) * stat.Bsize / 10
+ if nil == unix.Statfs(cache.dir, &stat) {
+ maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
+ atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+ } else {
+ // In this case we will set
+ // defaultMaxSize below after
+ // measuring current usage.
}
- atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
}
}
// Bail if a tidy goroutine is running in a different process.
- lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+ lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
if err != nil {
return
}
}
var ents []entT
var totalsize int64
- filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+ filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
cache.debugf("tidy: skipping dir %s: %s", path, err)
return nil
// If MaxSize wasn't specified and we failed to come up with a
// defaultSize above, use the larger of {current cache size, 1
- // GiB} as the defaultSize for subsequent tidy() operations.
+ // GiB} as the defaultMaxSize for subsequent tidy()
+ // operations.
if maxsize == 0 {
if totalsize < 1<<30 {
atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
if totalsize <= maxsize || len(ents) == 1 {
atomic.StoreInt64(&cache.sizeMeasured, totalsize)
atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+ cache.lastFileCount = int64(len(ents))
return
}
}
atomic.StoreInt64(&cache.sizeMeasured, totalsize)
atomic.StoreInt64(&cache.sizeEstimated, totalsize)
+ cache.lastFileCount = int64(len(ents) - deleted)
}