writing map[string]*writeprogress
writingCond *sync.Cond
writingLock sync.Mutex
+
+ sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
+ sizeEstimated int64 // last measured size, plus files we have written since
}
type writeprogress struct {
const (
cacheFileSuffix = ".keepcacheblock"
tmpFileSuffix = ".tmp"
- tidyHoldDuration = 10 * time.Second
+ tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
)
func (cache *DiskCache) cacheFile(locator string) string {
// BlockWrite writes through to the wrapped KeepGateway, and (if
// possible) retains a copy of the written block in the cache.
func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
- cache.gotidy()
unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
if err != nil {
cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
}
+ atomic.AddInt64(&cache.sizeEstimated, int64(n))
+ cache.gotidy()
}()
// Write through to the wrapped KeepGateway from the pipe,
// cache. The remainder of the block may continue to be copied into
// the cache in the background.
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
- cache.gotidy()
cachefilename := cache.cacheFile(locator)
if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
return n, err
}
return n, err
})})
+ atomic.AddInt64(&cache.sizeEstimated, int64(size))
+ cache.gotidy()
}()
}
progress.cond.L.Lock()
// Start a tidy() goroutine, unless one is already running / recently
// finished.
func (cache *DiskCache) gotidy() {
- // Return quickly if another tidy goroutine is running in this process.
+ // Skip if another tidy goroutine is running in this process.
n := atomic.AddInt32(&cache.tidying, 1)
- if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+ if n != 1 {
+ atomic.AddInt32(&cache.tidying, -1)
+ return
+ }
+ // Skip if sizeEstimated is based on an actual measurement and
+ // is below MaxSize, and we haven't reached the "recheck
+ // anyway" time threshold.
+ if cache.sizeMeasured > 0 &&
+ atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
+ time.Now().Before(cache.tidyHoldUntil) {
atomic.AddInt32(&cache.tidying, -1)
return
}
return
}
- if totalsize <= maxsize {
+ // If we're below MaxSize or there's only one block in the
+ // cache, just update the usage estimate and return.
+ //
+ // (We never delete the last block because that would merely
+ // cause the same block to get re-fetched repeatedly from the
+ // backend.)
+ if totalsize <= maxsize || len(ents) == 1 {
+ atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+ atomic.StoreInt64(&cache.sizeEstimated, totalsize)
return
}
- // Delete oldest entries until totalsize < maxsize.
+ // Set a new size target of maxsize minus 5%. This makes some
+ // room for sizeEstimate to grow before it triggers another
+ // tidy. We don't want to walk/sort an entire large cache
+ // directory each time we write a block.
+ target := maxsize - (maxsize / 20)
+
+ // Delete oldest entries until totalsize < target or we're
+ // down to a single cached block.
sort.Slice(ents, func(i, j int) bool {
return ents[i].atime.Before(ents[j].atime)
})
go cache.deleteHeldopen(ent.path, nil)
deleted++
totalsize -= ent.size
- if totalsize <= maxsize {
+ if totalsize <= target || deleted == len(ents)-1 {
break
}
}
"totalsize": totalsize,
}).Debugf("DiskCache: remaining cache usage after deleting")
}
+ atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+ atomic.StoreInt64(&cache.sizeEstimated, totalsize)
}
Data: make([]byte, 44000000),
})
c.Check(err, check.IsNil)
+
+ // Wait for tidy to finish, check that it doesn't delete the
+ // only block.
time.Sleep(time.Millisecond)
+ for atomic.LoadInt32(&cache.tidying) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
+
resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
Data: make([]byte, 32000000),
})
c.Check(err, check.IsNil)
delete(backend.data, resp1.Locator)
delete(backend.data, resp2.Locator)
- cache.tidyHoldUntil = time.Time{}
- cache.tidy()
+
+ // Wait for tidy to finish, check that it deleted the older
+ // block.
+ time.Sleep(time.Millisecond)
+ for atomic.LoadInt32(&cache.tidying) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
c.Check(n, check.Equals, 0)