1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
25 "github.com/sirupsen/logrus"
26 "golang.org/x/sys/unix"
29 type KeepGateway interface {
30 ReadAt(locator string, dst []byte, offset int) (int, error)
31 BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32 BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33 LocalLocator(locator string) (string, error)
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 type DiskCache struct {
40 MaxSize ByteSizeOrPercent
41 Logger logrus.FieldLogger
48 sharedCachesLock sync.Mutex
49 sharedCaches = map[string]*sharedCache{}
52 // sharedCache has fields that coordinate the cache usage in a single
53 // cache directory; it can be shared by multiple DiskCaches.
55 // This serves to share a single pool of held-open filehandles, a
56 // single tidying goroutine, etc., even when the program (like
57 // keep-web) uses multiple KeepGateway stacks that use different auth
59 type sharedCache struct {
61 maxSize ByteSizeOrPercent
63 tidying int32 // see tidy()
66 // The "heldopen" fields are used to open cache files for
67 // reading, and leave them open for future/concurrent ReadAt
68 // operations. See quickReadAt.
69 heldopen map[string]*openFileEnt
71 heldopenLock sync.Mutex
73 // The "writing" fields allow multiple concurrent/sequential
74 // ReadAt calls to be notified as a single
75 // read-block-from-backend-into-cache goroutine fills the
77 writing map[string]*writeprogress
78 writingCond *sync.Cond
79 writingLock sync.Mutex
81 sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
82 sizeEstimated int64 // last measured size, plus files we have written since
83 lastFileCount int64 // number of files on disk at last count
84 writesSinceTidy int64 // number of files written since last tidy()
87 type writeprogress struct {
88 cond *sync.Cond // broadcast whenever size or done changes
89 done bool // size and err have their final values
90 size int // bytes copied into cache file so far
91 err error // error encountered while copying from backend to cache
94 type openFileEnt struct {
97 err error // if err is non-nil, f should not be used.
101 cacheFileSuffix = ".keepcacheblock"
102 tmpFileSuffix = ".tmp"
105 func (cache *DiskCache) setup() {
106 sharedCachesLock.Lock()
107 defer sharedCachesLock.Unlock()
109 if sharedCaches[dir] == nil {
110 sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
112 cache.sharedCache = sharedCaches[dir]
115 func (cache *DiskCache) cacheFile(locator string) string {
117 if i := strings.Index(hash, "+"); i > 0 {
120 return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
123 // Open a cache file, creating the parent dir if necessary.
124 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
125 f, err := os.OpenFile(name, flags, 0600)
126 if os.IsNotExist(err) {
127 // Create the parent dir and try again. (We could have
128 // checked/created the parent dir before, but that
129 // would be less efficient in the much more common
130 // situation where it already exists.)
131 parent, _ := filepath.Split(name)
132 os.Mkdir(parent, 0700)
133 f, err = os.OpenFile(name, flags, 0600)
138 // Rename a file, creating the new path's parent dir if necessary.
139 func (cache *DiskCache) rename(old, new string) error {
140 if nil == os.Rename(old, new) {
143 parent, _ := filepath.Split(new)
144 os.Mkdir(parent, 0700)
145 return os.Rename(old, new)
148 func (cache *DiskCache) debugf(format string, args ...interface{}) {
149 logger := cache.Logger
153 logger.Debugf(format, args...)
156 // BlockWrite writes through to the wrapped KeepGateway, and (if
157 // possible) retains a copy of the written block in the cache.
158 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
159 cache.setupOnce.Do(cache.setup)
160 unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
161 tmpfilename := filepath.Join(cache.dir, "tmp", unique)
162 tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
164 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
165 return cache.KeepGateway.BlockWrite(ctx, opts)
168 ctx, cancel := context.WithCancel(ctx)
170 copyerr := make(chan error, 1)
172 // Start a goroutine to copy the caller's source data to
173 // tmpfile, a hash checker, and (via pipe) the wrapped
175 pipereader, pipewriter := io.Pipe()
176 defer pipereader.Close()
178 defer tmpfile.Close()
179 defer os.Remove(tmpfilename)
180 defer pipewriter.Close()
182 // Copy from opts.Data or opts.Reader, depending on
183 // which was provided.
185 if opts.Data != nil {
186 src = bytes.NewReader(opts.Data)
191 hashcheck := md5.New()
192 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
197 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
198 copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
202 err = tmpfile.Close()
204 // Don't rename tmpfile into place, but allow
205 // the BlockWrite call to succeed if nothing
209 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
210 if opts.Hash != "" && opts.Hash != hash {
211 // Even if the wrapped KeepGateway doesn't
212 // notice a problem, this should count as an
214 copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
218 cachefilename := cache.cacheFile(hash)
219 err = cache.rename(tmpfilename, cachefilename)
221 cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
223 atomic.AddInt64(&cache.sizeEstimated, int64(n))
227 // Write through to the wrapped KeepGateway from the pipe,
228 // instead of the original reader.
230 if newopts.DataSize == 0 {
231 newopts.DataSize = len(newopts.Data)
233 newopts.Reader = pipereader
236 resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
237 if len(copyerr) > 0 {
238 // If the copy-to-pipe goroutine failed, that error
239 // will be more helpful than the resulting "context
240 // canceled" or "read [from pipereader] failed" error
241 // seen by the wrapped KeepGateway.
243 // If the wrapped KeepGateway encounters an error
244 // before all the data is copied into the pipe, it
245 // stops reading from the pipe, which causes the
246 // io.Copy() in the goroutine to block until our
247 // deferred pipereader.Close() call runs. In that case
248 // len(copyerr)==0 here, so the wrapped KeepGateway
249 // error is the one we return to our caller.
255 type funcwriter func([]byte) (int, error)
257 func (fw funcwriter) Write(p []byte) (int, error) {
261 // ReadAt reads the entire block from the wrapped KeepGateway into the
262 // cache if needed, and copies the requested portion into the provided
265 // ReadAt returns as soon as the requested portion is available in the
266 // cache. The remainder of the block may continue to be copied into
267 // the cache in the background.
268 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
269 cache.setupOnce.Do(cache.setup)
270 cachefilename := cache.cacheFile(locator)
271 if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
274 readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
276 return 0, fmt.Errorf("ReadAt: %w", err)
280 err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
282 return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
285 cache.writingLock.Lock()
286 progress := cache.writing[cachefilename]
288 cache.writingLock.Unlock()
290 progress = &writeprogress{}
291 progress.cond = sync.NewCond(&sync.Mutex{})
292 if cache.writing == nil {
293 cache.writing = map[string]*writeprogress{}
295 cache.writing[cachefilename] = progress
296 cache.writingLock.Unlock()
298 // Start a goroutine to copy from backend to f. As
299 // data arrives, wake up any waiting loops (see below)
300 // so ReadAt() requests for partial data can return as
301 // soon as the relevant bytes have been copied.
307 closeErr := writef.Close()
311 progress.cond.L.Lock()
315 progress.cond.L.Unlock()
316 progress.cond.Broadcast()
317 cache.writingLock.Lock()
318 delete(cache.writing, cachefilename)
319 cache.writingLock.Unlock()
321 writef, err = cache.openFile(cachefilename, os.O_WRONLY)
323 err = fmt.Errorf("ReadAt: %w", err)
326 err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
328 err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
331 size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
333 WriteTo: funcwriter(func(p []byte) (int, error) {
334 n, err := writef.Write(p)
336 progress.cond.L.Lock()
338 progress.cond.L.Unlock()
339 progress.cond.Broadcast()
343 atomic.AddInt64(&cache.sizeEstimated, int64(size))
347 progress.cond.L.Lock()
348 for !progress.done && progress.size < len(dst)+offset {
351 ok := progress.size >= len(dst)+offset
353 progress.cond.L.Unlock()
355 if !ok && err != nil {
356 // If the copy-from-backend goroutine encountered an
357 // error before copying enough bytes to satisfy our
358 // request, we return that error.
361 // Regardless of whether the copy-from-backend
362 // goroutine succeeded, or failed after copying the
363 // bytes we need, the only errors we need to report
364 // are errors reading from the cache file.
365 return readf.ReadAt(dst, int64(offset))
369 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
371 // Remove the cache entry for the indicated cachefilename if it
372 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
375 // If expect is non-nil, close expect's filehandle.
377 // If expect is nil and a different cache entry is deleted, close its
379 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
382 cache.heldopenLock.Lock()
383 found := cache.heldopen[cachefilename]
384 if found != nil && (expect == nil || expect == found) {
385 delete(cache.heldopen, cachefilename)
388 cache.heldopenLock.Unlock()
390 if needclose != nil {
392 defer needclose.Unlock()
393 if needclose.f != nil {
400 // quickReadAt attempts to use a cached-filehandle approach to read
401 // from the indicated file. The expectation is that the caller
402 // (ReadAt) will try a more robust approach when this fails, so
403 // quickReadAt doesn't try especially hard to ensure success in
404 // races. In particular, when there are concurrent calls, and one
405 // fails, that can cause others to fail too.
406 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
408 cache.heldopenLock.Lock()
409 if cache.heldopenMax == 0 {
410 // Choose a reasonable limit on open cache files based
411 // on RLIMIT_NOFILE. Note Go automatically raises
412 // softlimit to hardlimit, so it's typically 1048576,
414 lim := syscall.Rlimit{}
415 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
417 cache.heldopenMax = 100
418 } else if lim.Cur > 400000 {
419 cache.heldopenMax = 10000
421 cache.heldopenMax = int(lim.Cur / 40)
424 heldopen := cache.heldopen[cachefilename]
427 heldopen = &openFileEnt{}
428 if cache.heldopen == nil {
429 cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
430 } else if len(cache.heldopen) > cache.heldopenMax {
431 // Rather than go to the trouble of tracking
432 // last access time, just close all files, and
433 // open again as needed. Even in the worst
434 // pathological case, this causes one extra
435 // open+close per read, which is not
436 // especially bad (see benchmarks).
437 go func(m map[string]*openFileEnt) {
438 for _, heldopen := range m {
440 defer heldopen.Unlock()
441 if heldopen.f != nil {
449 cache.heldopen[cachefilename] = heldopen
452 cache.heldopenLock.Unlock()
455 // Open and flock the file, then call wg.Done() to
456 // unblock any other goroutines that are waiting in
457 // the !isnew case above.
458 f, err := os.Open(cachefilename)
460 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
469 go cache.deleteHeldopen(cachefilename, heldopen)
473 // Acquire read lock to ensure (1) initialization is complete,
474 // if it's done by a different goroutine, and (2) any "delete
475 // old/unused entries" waits for our read to finish before
478 defer heldopen.RUnlock()
479 if heldopen.err != nil {
480 // Other goroutine encountered an error during setup
481 return 0, heldopen.err
482 } else if heldopen.f == nil {
483 // Other goroutine closed the file before we got RLock
484 return 0, quickReadAtLostRace
487 // If another goroutine is currently writing the file, wait
488 // for it to catch up to the end of the range we need.
489 cache.writingLock.Lock()
490 progress := cache.writing[cachefilename]
491 cache.writingLock.Unlock()
493 progress.cond.L.Lock()
494 for !progress.done && progress.size < len(dst)+offset {
497 progress.cond.L.Unlock()
498 // If size<needed && progress.err!=nil here, we'll end
499 // up reporting a less helpful "EOF reading from cache
500 // file" below, instead of the actual error fetching
501 // from upstream to cache file. This is OK though,
502 // because our caller (ReadAt) doesn't even report our
503 // error, it just retries.
506 n, err := heldopen.f.ReadAt(dst, int64(offset))
508 // wait for any concurrent users to finish, then
509 // delete this cache entry in case reopening the
510 // backing file helps.
511 go cache.deleteHeldopen(cachefilename, heldopen)
516 // BlockRead reads an entire block using a 128 KiB buffer.
517 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
518 cache.setupOnce.Do(cache.setup)
519 i := strings.Index(opts.Locator, "+")
520 if i < 0 || i >= len(opts.Locator) {
521 return 0, errors.New("invalid block locator: no size hint")
523 sizestr := opts.Locator[i+1:]
524 i = strings.Index(sizestr, "+")
526 sizestr = sizestr[:i]
528 blocksize, err := strconv.ParseInt(sizestr, 10, 32)
529 if err != nil || blocksize < 0 {
530 return 0, errors.New("invalid block locator: invalid size hint")
534 buf := make([]byte, 131072)
535 for offset < int(blocksize) {
536 if ctx.Err() != nil {
537 return offset, ctx.Err()
539 if int(blocksize)-offset > len(buf) {
540 buf = buf[:int(blocksize)-offset]
542 nr, err := cache.ReadAt(opts.Locator, buf, offset)
544 nw, err := opts.WriteTo.Write(buf)
546 return offset + nw, err
557 // Start a tidy() goroutine, unless one is already running / recently
559 func (cache *DiskCache) gotidy() {
560 writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
561 // Skip if another tidy goroutine is running in this process.
562 n := atomic.AddInt32(&cache.tidying, 1)
564 atomic.AddInt32(&cache.tidying, -1)
567 // Skip if sizeEstimated is based on an actual measurement and
568 // is below maxSize, and we haven't done very many writes
569 // since last tidy (defined as 1% of number of cache files at
571 if cache.sizeMeasured > 0 &&
572 atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
573 writes < cache.lastFileCount/100 {
574 atomic.AddInt32(&cache.tidying, -1)
579 atomic.StoreInt64(&cache.writesSinceTidy, 0)
580 atomic.AddInt32(&cache.tidying, -1)
584 // Delete cache files as needed to control disk usage.
585 func (cache *DiskCache) tidy() {
586 maxsize := int64(cache.maxSize.ByteSize())
588 maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
590 // defaultMaxSize not yet computed. Use 10% of
591 // filesystem capacity (or different
592 // percentage if indicated by cache.maxSize)
593 pct := cache.maxSize.Percent()
597 var stat unix.Statfs_t
598 if nil == unix.Statfs(cache.dir, &stat) {
599 maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
600 atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
602 // In this case we will set
603 // defaultMaxSize below after
604 // measuring current usage.
609 // Bail if a tidy goroutine is running in a different process.
610 lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
614 defer lockfile.Close()
615 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
627 filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
629 cache.debugf("tidy: skipping dir %s: %s", path, err)
635 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
639 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
640 // Access time is available (hopefully the
641 // filesystem is not mounted with noatime)
642 atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
644 // If access time isn't available we fall back
645 // to sorting by modification time.
646 atime = info.ModTime()
648 ents = append(ents, entT{path, atime, info.Size()})
649 totalsize += info.Size()
652 if cache.Logger != nil {
653 cache.Logger.WithFields(logrus.Fields{
654 "totalsize": totalsize,
656 }).Debugf("DiskCache: checked current cache usage")
659 // If MaxSize wasn't specified and we failed to come up with a
660 // defaultSize above, use the larger of {current cache size, 1
661 // GiB} as the defaultMaxSize for subsequent tidy()
664 if totalsize < 1<<30 {
665 atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
667 atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
669 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
673 // If we're below MaxSize or there's only one block in the
674 // cache, just update the usage estimate and return.
676 // (We never delete the last block because that would merely
677 // cause the same block to get re-fetched repeatedly from the
679 if totalsize <= maxsize || len(ents) == 1 {
680 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
681 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
682 cache.lastFileCount = int64(len(ents))
686 // Set a new size target of maxsize minus 5%. This makes some
687 // room for sizeEstimate to grow before it triggers another
688 // tidy. We don't want to walk/sort an entire large cache
689 // directory each time we write a block.
690 target := maxsize - (maxsize / 20)
692 // Delete oldest entries until totalsize < target or we're
693 // down to a single cached block.
694 sort.Slice(ents, func(i, j int) bool {
695 return ents[i].atime.Before(ents[j].atime)
698 for _, ent := range ents {
700 go cache.deleteHeldopen(ent.path, nil)
702 totalsize -= ent.size
703 if totalsize <= target || deleted == len(ents)-1 {
708 if cache.Logger != nil {
709 cache.Logger.WithFields(logrus.Fields{
711 "totalsize": totalsize,
712 }).Debugf("DiskCache: remaining cache usage after deleting")
714 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
715 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
716 cache.lastFileCount = int64(len(ents) - deleted)