1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
25 "github.com/sirupsen/logrus"
26 "golang.org/x/sys/unix"
29 type KeepGateway interface {
30 ReadAt(locator string, dst []byte, offset int) (int, error)
31 BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32 BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33 LocalLocator(locator string) (string, error)
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
38 // A DiskCache is automatically incorporated into the backend stack of
39 // each keepclient.KeepClient. Most programs do not need to use
40 // DiskCache directly.
41 type DiskCache struct {
44 MaxSize ByteSizeOrPercent
45 Logger logrus.FieldLogger
52 sharedCachesLock sync.Mutex
53 sharedCaches = map[string]*sharedCache{}
56 // sharedCache has fields that coordinate the cache usage in a single
57 // cache directory; it can be shared by multiple DiskCaches.
59 // This serves to share a single pool of held-open filehandles, a
60 // single tidying goroutine, etc., even when the program (like
61 // keep-web) uses multiple KeepGateway stacks that use different auth
63 type sharedCache struct {
65 maxSize ByteSizeOrPercent
67 tidying int32 // see tidy()
70 // The "heldopen" fields are used to open cache files for
71 // reading, and leave them open for future/concurrent ReadAt
72 // operations. See quickReadAt.
73 heldopen map[string]*openFileEnt
75 heldopenLock sync.Mutex
77 // The "writing" fields allow multiple concurrent/sequential
78 // ReadAt calls to be notified as a single
79 // read-block-from-backend-into-cache goroutine fills the
81 writing map[string]*writeprogress
82 writingCond *sync.Cond
83 writingLock sync.Mutex
85 sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
86 sizeEstimated int64 // last measured size, plus files we have written since
87 lastFileCount int64 // number of files on disk at last count
88 writesSinceTidy int64 // number of files written since last tidy()
91 type writeprogress struct {
92 cond *sync.Cond // broadcast whenever size or done changes
93 done bool // size and err have their final values
94 size int // bytes copied into cache file so far
95 err error // error encountered while copying from backend to cache
96 sharedf *os.File // readable filehandle, usable if done && err==nil
97 readers sync.WaitGroup // goroutines that haven't finished reading from f yet
100 type openFileEnt struct {
103 err error // if err is non-nil, f should not be used.
107 cacheFileSuffix = ".keepcacheblock"
108 tmpFileSuffix = ".tmp"
111 func (cache *DiskCache) setup() {
112 sharedCachesLock.Lock()
113 defer sharedCachesLock.Unlock()
115 if sharedCaches[dir] == nil {
116 cache.debugf("initializing sharedCache using %s with max size %d", dir, cache.MaxSize)
117 sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
119 cache.debugf("using existing sharedCache using %s with max size %d (would have initialized with %d)", dir, sharedCaches[dir].maxSize, cache.MaxSize)
121 cache.sharedCache = sharedCaches[dir]
124 func (cache *DiskCache) cacheFile(locator string) string {
126 if i := strings.Index(hash, "+"); i > 0 {
129 return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
132 // Open a cache file, creating the parent dir if necessary.
133 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
134 f, err := os.OpenFile(name, flags, 0600)
135 if os.IsNotExist(err) {
136 // Create the parent dir and try again. (We could have
137 // checked/created the parent dir before, but that
138 // would be less efficient in the much more common
139 // situation where it already exists.)
140 parent, _ := filepath.Split(name)
141 os.Mkdir(parent, 0700)
142 f, err = os.OpenFile(name, flags, 0600)
147 // Rename a file, creating the new path's parent dir if necessary.
148 func (cache *DiskCache) rename(old, new string) error {
149 if nil == os.Rename(old, new) {
152 parent, _ := filepath.Split(new)
153 os.Mkdir(parent, 0700)
154 return os.Rename(old, new)
157 func (cache *DiskCache) debugf(format string, args ...interface{}) {
158 logger := cache.Logger
162 logger.Debugf(format, args...)
165 // BlockWrite writes through to the wrapped KeepGateway, and (if
166 // possible) retains a copy of the written block in the cache.
167 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
168 cache.setupOnce.Do(cache.setup)
169 unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
170 tmpfilename := filepath.Join(cache.dir, "tmp", unique)
171 tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
173 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
174 return cache.KeepGateway.BlockWrite(ctx, opts)
177 ctx, cancel := context.WithCancel(ctx)
179 copyerr := make(chan error, 1)
181 // Start a goroutine to copy the caller's source data to
182 // tmpfile, a hash checker, and (via pipe) the wrapped
184 pipereader, pipewriter := io.Pipe()
185 defer pipereader.Close()
187 // Note this is a double-close (which is a no-op) in
189 defer tmpfile.Close()
190 // Note this is a no-op in the happy path (the
191 // uniquely named tmpfilename will have been renamed).
192 defer os.Remove(tmpfilename)
193 defer pipewriter.Close()
195 // Copy from opts.Data or opts.Reader, depending on
196 // which was provided.
198 if opts.Data != nil {
199 src = bytes.NewReader(opts.Data)
204 hashcheck := md5.New()
205 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
210 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
211 copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
215 err = tmpfile.Close()
217 // Don't rename tmpfile into place, but allow
218 // the BlockWrite call to succeed if nothing
222 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
223 if opts.Hash != "" && opts.Hash != hash {
224 // Even if the wrapped KeepGateway doesn't
225 // notice a problem, this should count as an
227 copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
231 cachefilename := cache.cacheFile(hash)
232 err = cache.rename(tmpfilename, cachefilename)
234 cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
236 atomic.AddInt64(&cache.sizeEstimated, int64(n))
240 // Write through to the wrapped KeepGateway from the pipe,
241 // instead of the original reader.
243 if newopts.DataSize == 0 {
244 newopts.DataSize = len(newopts.Data)
246 newopts.Reader = pipereader
249 resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
250 if len(copyerr) > 0 {
251 // If the copy-to-pipe goroutine failed, that error
252 // will be more helpful than the resulting "context
253 // canceled" or "read [from pipereader] failed" error
254 // seen by the wrapped KeepGateway.
256 // If the wrapped KeepGateway encounters an error
257 // before all the data is copied into the pipe, it
258 // stops reading from the pipe, which causes the
259 // io.Copy() in the goroutine to block until our
260 // deferred pipereader.Close() call runs. In that case
261 // len(copyerr)==0 here, so the wrapped KeepGateway
262 // error is the one we return to our caller.
268 type funcwriter func([]byte) (int, error)
270 func (fw funcwriter) Write(p []byte) (int, error) {
274 // ReadAt reads the entire block from the wrapped KeepGateway into the
275 // cache if needed, and copies the requested portion into the provided
278 // ReadAt returns as soon as the requested portion is available in the
279 // cache. The remainder of the block may continue to be copied into
280 // the cache in the background.
281 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
282 cache.setupOnce.Do(cache.setup)
283 cachefilename := cache.cacheFile(locator)
284 if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
288 cache.writingLock.Lock()
289 progress := cache.writing[cachefilename]
291 // Nobody else is fetching from backend, so we'll add
292 // a new entry to cache.writing, fetch in a separate
294 progress = &writeprogress{}
295 progress.cond = sync.NewCond(&sync.Mutex{})
296 if cache.writing == nil {
297 cache.writing = map[string]*writeprogress{}
299 cache.writing[cachefilename] = progress
301 // Start a goroutine to copy from backend to f. As
302 // data arrives, wake up any waiting loops (see below)
303 // so ReadAt() requests for partial data can return as
304 // soon as the relevant bytes have been copied.
309 if err == nil && progress.sharedf != nil {
310 err = progress.sharedf.Sync()
312 progress.cond.L.Lock()
316 progress.cond.L.Unlock()
317 progress.cond.Broadcast()
318 cache.writingLock.Lock()
319 delete(cache.writing, cachefilename)
320 cache.writingLock.Unlock()
322 // Wait for other goroutines to wake
323 // up, notice we're done, and use our
324 // sharedf to read their data, before
327 // Nobody can join the WaitGroup after
328 // the progress entry is deleted from
329 // cache.writing above. Therefore,
330 // this Wait ensures nobody else is
331 // accessing progress, and we don't
332 // need to lock anything.
333 progress.readers.Wait()
334 progress.sharedf.Close()
336 progress.sharedf, err = cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
338 err = fmt.Errorf("ReadAt: %w", err)
341 err = syscall.Flock(int(progress.sharedf.Fd()), syscall.LOCK_SH)
343 err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
346 size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
348 WriteTo: funcwriter(func(p []byte) (int, error) {
349 n, err := progress.sharedf.Write(p)
351 progress.cond.L.Lock()
353 progress.cond.L.Unlock()
354 progress.cond.Broadcast()
358 atomic.AddInt64(&cache.sizeEstimated, int64(size))
362 // We add ourselves to the readers WaitGroup so the
363 // fetch-from-backend goroutine doesn't close the shared
364 // filehandle before we read the data we need from it.
365 progress.readers.Add(1)
366 defer progress.readers.Done()
367 cache.writingLock.Unlock()
369 progress.cond.L.Lock()
370 for !progress.done && progress.size < len(dst)+offset {
373 sharedf := progress.sharedf
375 progress.cond.L.Unlock()
378 // If the copy-from-backend goroutine encountered an
379 // error, we return that error. (Even if we read the
380 // desired number of bytes, the error might be
381 // something like BadChecksum so we should not ignore
386 // It's possible that sharedf==nil here (the writer
387 // goroutine might not have done anything at all yet)
388 // and we don't need it anyway because no bytes are
389 // being read. Reading zero bytes seems pointless, but
390 // if someone does it, we might as well return
391 // suitable values, rather than risk a crash by
392 // calling sharedf.ReadAt() when sharedf is nil.
395 return sharedf.ReadAt(dst, int64(offset))
398 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
400 // Remove the cache entry for the indicated cachefilename if it
401 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
404 // If expect is non-nil, close expect's filehandle.
406 // If expect is nil and a different cache entry is deleted, close its
408 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
411 cache.heldopenLock.Lock()
412 found := cache.heldopen[cachefilename]
413 if found != nil && (expect == nil || expect == found) {
414 delete(cache.heldopen, cachefilename)
417 cache.heldopenLock.Unlock()
419 if needclose != nil {
421 defer needclose.Unlock()
422 if needclose.f != nil {
429 // quickReadAt attempts to use a cached-filehandle approach to read
430 // from the indicated file. The expectation is that the caller
431 // (ReadAt) will try a more robust approach when this fails, so
432 // quickReadAt doesn't try especially hard to ensure success in
433 // races. In particular, when there are concurrent calls, and one
434 // fails, that can cause others to fail too.
435 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
437 cache.heldopenLock.Lock()
438 if cache.heldopenMax == 0 {
439 // Choose a reasonable limit on open cache files based
440 // on RLIMIT_NOFILE. Note Go automatically raises
441 // softlimit to hardlimit, so it's typically 1048576,
443 lim := syscall.Rlimit{}
444 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
446 cache.heldopenMax = 100
447 } else if lim.Cur > 400000 {
448 cache.heldopenMax = 10000
450 cache.heldopenMax = int(lim.Cur / 40)
453 heldopen := cache.heldopen[cachefilename]
456 heldopen = &openFileEnt{}
457 if cache.heldopen == nil {
458 cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
459 } else if len(cache.heldopen) > cache.heldopenMax {
460 // Rather than go to the trouble of tracking
461 // last access time, just close all files, and
462 // open again as needed. Even in the worst
463 // pathological case, this causes one extra
464 // open+close per read, which is not
465 // especially bad (see benchmarks).
466 go func(m map[string]*openFileEnt) {
467 for _, heldopen := range m {
469 defer heldopen.Unlock()
470 if heldopen.f != nil {
478 cache.heldopen[cachefilename] = heldopen
481 cache.heldopenLock.Unlock()
484 // Open and flock the file, save the filehandle (or
485 // error) in heldopen.f, and release the write lock so
486 // other goroutines waiting at heldopen.RLock() below
487 // can use the shared filehandle (or shared error).
488 f, err := os.Open(cachefilename)
490 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
499 go cache.deleteHeldopen(cachefilename, heldopen)
503 // Acquire read lock to ensure (1) initialization is complete,
504 // if it's done by a different goroutine, and (2) any "delete
505 // old/unused entries" waits for our read to finish before
508 defer heldopen.RUnlock()
509 if heldopen.err != nil {
510 // Other goroutine encountered an error during setup
511 return 0, heldopen.err
512 } else if heldopen.f == nil {
513 // Other goroutine closed the file before we got RLock
514 return 0, quickReadAtLostRace
517 // If another goroutine is currently writing the file, wait
518 // for it to catch up to the end of the range we need.
519 cache.writingLock.Lock()
520 progress := cache.writing[cachefilename]
521 cache.writingLock.Unlock()
523 progress.cond.L.Lock()
524 for !progress.done && progress.size < len(dst)+offset {
527 progress.cond.L.Unlock()
528 // If size<needed && progress.err!=nil here, we'll end
529 // up reporting a less helpful "EOF reading from cache
530 // file" below, instead of the actual error fetching
531 // from upstream to cache file. This is OK though,
532 // because our caller (ReadAt) doesn't even report our
533 // error, it just retries.
536 n, err := heldopen.f.ReadAt(dst, int64(offset))
538 // wait for any concurrent users to finish, then
539 // delete this cache entry in case reopening the
540 // backing file helps.
541 go cache.deleteHeldopen(cachefilename, heldopen)
546 // BlockRead reads an entire block using a 128 KiB buffer.
547 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
548 cache.setupOnce.Do(cache.setup)
549 i := strings.Index(opts.Locator, "+")
550 if i < 0 || i >= len(opts.Locator) {
551 return 0, errors.New("invalid block locator: no size hint")
553 sizestr := opts.Locator[i+1:]
554 i = strings.Index(sizestr, "+")
556 sizestr = sizestr[:i]
558 blocksize, err := strconv.ParseInt(sizestr, 10, 32)
559 if err != nil || blocksize < 0 {
560 return 0, errors.New("invalid block locator: invalid size hint")
564 buf := make([]byte, 131072)
565 for offset < int(blocksize) {
566 if ctx.Err() != nil {
567 return offset, ctx.Err()
569 if int(blocksize)-offset < len(buf) {
570 buf = buf[:int(blocksize)-offset]
572 nr, err := cache.ReadAt(opts.Locator, buf, offset)
574 nw, err := opts.WriteTo.Write(buf[:nr])
576 return offset + nw, err
587 // Start a tidy() goroutine, unless one is already running / recently
589 func (cache *DiskCache) gotidy() {
590 writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
591 // Skip if another tidy goroutine is running in this process.
592 n := atomic.AddInt32(&cache.tidying, 1)
594 atomic.AddInt32(&cache.tidying, -1)
597 // Skip if sizeEstimated is based on an actual measurement and
598 // is below maxSize, and we haven't done very many writes
599 // since last tidy (defined as 1% of number of cache files at
601 if cache.sizeMeasured > 0 &&
602 atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
603 writes < cache.lastFileCount/100 {
604 atomic.AddInt32(&cache.tidying, -1)
609 atomic.StoreInt64(&cache.writesSinceTidy, 0)
610 atomic.AddInt32(&cache.tidying, -1)
614 // Delete cache files as needed to control disk usage.
615 func (cache *DiskCache) tidy() {
616 maxsize := int64(cache.maxSize.ByteSize())
618 maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
620 // defaultMaxSize not yet computed. Use 10% of
621 // filesystem capacity (or different
622 // percentage if indicated by cache.maxSize)
623 pct := cache.maxSize.Percent()
627 var stat unix.Statfs_t
628 if nil == unix.Statfs(cache.dir, &stat) {
629 maxsize = int64(stat.Blocks) * stat.Bsize * pct / 100
630 atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
631 cache.debugf("setting cache size %d = blocks %d * bsize %d * pct %d / 100", maxsize, stat.Blocks, stat.Bsize, pct)
633 // In this case we will set
634 // defaultMaxSize below after
635 // measuring current usage.
640 // Bail if a tidy goroutine is running in a different process.
641 lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
645 defer lockfile.Close()
646 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
658 filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
660 cache.debugf("tidy: skipping dir %s: %s", path, err)
666 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
670 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
671 // Access time is available (hopefully the
672 // filesystem is not mounted with noatime)
673 atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
675 // If access time isn't available we fall back
676 // to sorting by modification time.
677 atime = info.ModTime()
679 ents = append(ents, entT{path, atime, info.Size()})
680 totalsize += info.Size()
683 if cache.Logger != nil {
684 cache.Logger.WithFields(logrus.Fields{
685 "totalsize": totalsize,
687 }).Debugf("DiskCache: checked current cache usage")
690 // If MaxSize wasn't specified and we failed to come up with a
691 // defaultSize above, use the larger of {current cache size, 1
692 // GiB} as the defaultMaxSize for subsequent tidy()
695 if totalsize < 1<<30 {
696 atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
698 atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
700 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
704 // If we're below MaxSize or there's only one block in the
705 // cache, just update the usage estimate and return.
707 // (We never delete the last block because that would merely
708 // cause the same block to get re-fetched repeatedly from the
710 if totalsize <= maxsize || len(ents) == 1 {
711 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
712 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
713 cache.lastFileCount = int64(len(ents))
717 // Set a new size target of maxsize minus 5%. This makes some
718 // room for sizeEstimate to grow before it triggers another
719 // tidy. We don't want to walk/sort an entire large cache
720 // directory each time we write a block.
721 target := maxsize - (maxsize / 20)
723 // Delete oldest entries until totalsize < target or we're
724 // down to a single cached block.
725 sort.Slice(ents, func(i, j int) bool {
726 return ents[i].atime.Before(ents[j].atime)
729 for _, ent := range ents {
731 go cache.deleteHeldopen(ent.path, nil)
733 totalsize -= ent.size
734 if totalsize <= target || deleted == len(ents)-1 {
739 if cache.Logger != nil {
740 cache.Logger.WithFields(logrus.Fields{
742 "totalsize": totalsize,
743 }).Debugf("DiskCache: remaining cache usage after deleting")
745 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
746 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
747 cache.lastFileCount = int64(len(ents) - deleted)