1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
25 "github.com/sirupsen/logrus"
26 "golang.org/x/sys/unix"
29 type KeepGateway interface {
30 ReadAt(locator string, dst []byte, offset int) (int, error)
31 BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32 BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33 LocalLocator(locator string) (string, error)
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 type DiskCache struct {
41 Logger logrus.FieldLogger
43 tidying int32 // see tidy()
44 tidyHoldUntil time.Time
47 // The "heldopen" fields are used to open cache files for
48 // reading, and leave them open for future/concurrent ReadAt
49 // operations. See quickReadAt.
50 heldopen map[string]*openFileEnt
52 heldopenLock sync.Mutex
54 // The "writing" fields allow multiple concurrent/sequential
55 // ReadAt calls to be notified as a single
56 // read-block-from-backend-into-cache goroutine fills the
58 writing map[string]*writeprogress
59 writingCond *sync.Cond
60 writingLock sync.Mutex
62 sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
63 sizeEstimated int64 // last measured size, plus files we have written since
66 type writeprogress struct {
67 cond *sync.Cond // broadcast whenever size or done changes
68 done bool // size and err have their final values
69 size int // bytes copied into cache file so far
70 err error // error encountered while copying from backend to cache
73 type openFileEnt struct {
76 err error // if err is non-nil, f should not be used.
80 cacheFileSuffix = ".keepcacheblock"
81 tmpFileSuffix = ".tmp"
82 tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
85 func (cache *DiskCache) cacheFile(locator string) string {
87 if i := strings.Index(hash, "+"); i > 0 {
90 return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
93 // Open a cache file, creating the parent dir if necessary.
94 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
95 f, err := os.OpenFile(name, flags, 0600)
96 if os.IsNotExist(err) {
97 // Create the parent dir and try again. (We could have
98 // checked/created the parent dir before, but that
99 // would be less efficient in the much more common
100 // situation where it already exists.)
101 parent, _ := filepath.Split(name)
102 os.Mkdir(parent, 0700)
103 f, err = os.OpenFile(name, flags, 0600)
108 // Rename a file, creating the new path's parent dir if necessary.
109 func (cache *DiskCache) rename(old, new string) error {
110 if nil == os.Rename(old, new) {
113 parent, _ := filepath.Split(new)
114 os.Mkdir(parent, 0700)
115 return os.Rename(old, new)
118 func (cache *DiskCache) debugf(format string, args ...interface{}) {
119 logger := cache.Logger
123 logger.Debugf(format, args...)
126 // BlockWrite writes through to the wrapped KeepGateway, and (if
127 // possible) retains a copy of the written block in the cache.
128 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
129 unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
130 tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
131 tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
133 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
134 return cache.KeepGateway.BlockWrite(ctx, opts)
137 ctx, cancel := context.WithCancel(ctx)
139 copyerr := make(chan error, 1)
141 // Start a goroutine to copy the caller's source data to
142 // tmpfile, a hash checker, and (via pipe) the wrapped
144 pipereader, pipewriter := io.Pipe()
145 defer pipereader.Close()
147 defer tmpfile.Close()
148 defer os.Remove(tmpfilename)
149 defer pipewriter.Close()
151 // Copy from opts.Data or opts.Reader, depending on
152 // which was provided.
154 if opts.Data != nil {
155 src = bytes.NewReader(opts.Data)
160 hashcheck := md5.New()
161 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
166 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
167 copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
171 err = tmpfile.Close()
173 // Don't rename tmpfile into place, but allow
174 // the BlockWrite call to succeed if nothing
178 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
179 if opts.Hash != "" && opts.Hash != hash {
180 // Even if the wrapped KeepGateway doesn't
181 // notice a problem, this should count as an
183 copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
187 cachefilename := cache.cacheFile(hash)
188 err = cache.rename(tmpfilename, cachefilename)
190 cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
192 atomic.AddInt64(&cache.sizeEstimated, int64(n))
196 // Write through to the wrapped KeepGateway from the pipe,
197 // instead of the original reader.
199 if newopts.DataSize == 0 {
200 newopts.DataSize = len(newopts.Data)
202 newopts.Reader = pipereader
205 resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
206 if len(copyerr) > 0 {
207 // If the copy-to-pipe goroutine failed, that error
208 // will be more helpful than the resulting "context
209 // canceled" or "read [from pipereader] failed" error
210 // seen by the wrapped KeepGateway.
212 // If the wrapped KeepGateway encounters an error
213 // before all the data is copied into the pipe, it
214 // stops reading from the pipe, which causes the
215 // io.Copy() in the goroutine to block until our
216 // deferred pipereader.Close() call runs. In that case
217 // len(copyerr)==0 here, so the wrapped KeepGateway
218 // error is the one we return to our caller.
224 type funcwriter func([]byte) (int, error)
226 func (fw funcwriter) Write(p []byte) (int, error) {
230 // ReadAt reads the entire block from the wrapped KeepGateway into the
231 // cache if needed, and copies the requested portion into the provided
234 // ReadAt returns as soon as the requested portion is available in the
235 // cache. The remainder of the block may continue to be copied into
236 // the cache in the background.
237 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
238 cachefilename := cache.cacheFile(locator)
239 if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
242 readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
244 return 0, fmt.Errorf("ReadAt: %w", err)
248 err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
250 return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
253 cache.writingLock.Lock()
254 progress := cache.writing[cachefilename]
256 cache.writingLock.Unlock()
258 progress = &writeprogress{}
259 progress.cond = sync.NewCond(&sync.Mutex{})
260 if cache.writing == nil {
261 cache.writing = map[string]*writeprogress{}
263 cache.writing[cachefilename] = progress
264 cache.writingLock.Unlock()
266 // Start a goroutine to copy from backend to f. As
267 // data arrives, wake up any waiting loops (see below)
268 // so ReadAt() requests for partial data can return as
269 // soon as the relevant bytes have been copied.
275 closeErr := writef.Close()
279 progress.cond.L.Lock()
283 progress.cond.L.Unlock()
284 progress.cond.Broadcast()
285 cache.writingLock.Lock()
286 delete(cache.writing, cachefilename)
287 cache.writingLock.Unlock()
289 writef, err = cache.openFile(cachefilename, os.O_WRONLY)
291 err = fmt.Errorf("ReadAt: %w", err)
294 err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
296 err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
299 size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
301 WriteTo: funcwriter(func(p []byte) (int, error) {
302 n, err := writef.Write(p)
304 progress.cond.L.Lock()
306 progress.cond.L.Unlock()
307 progress.cond.Broadcast()
311 atomic.AddInt64(&cache.sizeEstimated, int64(size))
315 progress.cond.L.Lock()
316 for !progress.done && progress.size < len(dst)+offset {
319 ok := progress.size >= len(dst)+offset
321 progress.cond.L.Unlock()
323 if !ok && err != nil {
324 // If the copy-from-backend goroutine encountered an
325 // error before copying enough bytes to satisfy our
326 // request, we return that error.
329 // Regardless of whether the copy-from-backend
330 // goroutine succeeded, or failed after copying the
331 // bytes we need, the only errors we need to report
332 // are errors reading from the cache file.
333 return readf.ReadAt(dst, int64(offset))
337 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
339 // Remove the cache entry for the indicated cachefilename if it
340 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
343 // If expect is non-nil, close expect's filehandle.
345 // If expect is nil and a different cache entry is deleted, close its
347 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
350 cache.heldopenLock.Lock()
351 found := cache.heldopen[cachefilename]
352 if found != nil && (expect == nil || expect == found) {
353 delete(cache.heldopen, cachefilename)
356 cache.heldopenLock.Unlock()
358 if needclose != nil {
360 defer needclose.Unlock()
361 if needclose.f != nil {
368 // quickReadAt attempts to use a cached-filehandle approach to read
369 // from the indicated file. The expectation is that the caller
370 // (ReadAt) will try a more robust approach when this fails, so
371 // quickReadAt doesn't try especially hard to ensure success in
372 // races. In particular, when there are concurrent calls, and one
373 // fails, that can cause others to fail too.
374 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
376 cache.heldopenLock.Lock()
377 if cache.heldopenMax == 0 {
378 // Choose a reasonable limit on open cache files based
379 // on RLIMIT_NOFILE. Note Go automatically raises
380 // softlimit to hardlimit, so it's typically 1048576,
382 lim := syscall.Rlimit{}
383 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
385 cache.heldopenMax = 256
386 } else if lim.Cur > 40000 {
387 cache.heldopenMax = 10000
389 cache.heldopenMax = int(lim.Cur / 4)
392 heldopen := cache.heldopen[cachefilename]
395 heldopen = &openFileEnt{}
396 if cache.heldopen == nil {
397 cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
398 } else if len(cache.heldopen) > cache.heldopenMax {
399 // Rather than go to the trouble of tracking
400 // last access time, just close all files, and
401 // open again as needed. Even in the worst
402 // pathological case, this causes one extra
403 // open+close per read, which is not
404 // especially bad (see benchmarks).
405 go func(m map[string]*openFileEnt) {
406 for _, heldopen := range m {
408 defer heldopen.Unlock()
409 if heldopen.f != nil {
417 cache.heldopen[cachefilename] = heldopen
420 cache.heldopenLock.Unlock()
423 // Open and flock the file, then call wg.Done() to
424 // unblock any other goroutines that are waiting in
425 // the !isnew case above.
426 f, err := os.Open(cachefilename)
428 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
437 go cache.deleteHeldopen(cachefilename, heldopen)
441 // Acquire read lock to ensure (1) initialization is complete,
442 // if it's done by a different goroutine, and (2) any "delete
443 // old/unused entries" waits for our read to finish before
446 defer heldopen.RUnlock()
447 if heldopen.err != nil {
448 // Other goroutine encountered an error during setup
449 return 0, heldopen.err
450 } else if heldopen.f == nil {
451 // Other goroutine closed the file before we got RLock
452 return 0, quickReadAtLostRace
455 // If another goroutine is currently writing the file, wait
456 // for it to catch up to the end of the range we need.
457 cache.writingLock.Lock()
458 progress := cache.writing[cachefilename]
459 cache.writingLock.Unlock()
461 progress.cond.L.Lock()
462 for !progress.done && progress.size < len(dst)+offset {
465 progress.cond.L.Unlock()
466 // If size<needed && progress.err!=nil here, we'll end
467 // up reporting a less helpful "EOF reading from cache
468 // file" below, instead of the actual error fetching
469 // from upstream to cache file. This is OK though,
470 // because our caller (ReadAt) doesn't even report our
471 // error, it just retries.
474 n, err := heldopen.f.ReadAt(dst, int64(offset))
476 // wait for any concurrent users to finish, then
477 // delete this cache entry in case reopening the
478 // backing file helps.
479 go cache.deleteHeldopen(cachefilename, heldopen)
484 // BlockRead reads an entire block using a 128 KiB buffer.
485 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
486 i := strings.Index(opts.Locator, "+")
487 if i < 0 || i >= len(opts.Locator) {
488 return 0, errors.New("invalid block locator: no size hint")
490 sizestr := opts.Locator[i+1:]
491 i = strings.Index(sizestr, "+")
493 sizestr = sizestr[:i]
495 blocksize, err := strconv.ParseInt(sizestr, 10, 32)
496 if err != nil || blocksize < 0 {
497 return 0, errors.New("invalid block locator: invalid size hint")
501 buf := make([]byte, 131072)
502 for offset < int(blocksize) {
503 if ctx.Err() != nil {
504 return offset, ctx.Err()
506 if int(blocksize)-offset > len(buf) {
507 buf = buf[:int(blocksize)-offset]
509 nr, err := cache.ReadAt(opts.Locator, buf, offset)
511 nw, err := opts.WriteTo.Write(buf)
513 return offset + nw, err
524 // Start a tidy() goroutine, unless one is already running / recently
526 func (cache *DiskCache) gotidy() {
527 // Skip if another tidy goroutine is running in this process.
528 n := atomic.AddInt32(&cache.tidying, 1)
530 atomic.AddInt32(&cache.tidying, -1)
533 // Skip if sizeEstimated is based on an actual measurement and
534 // is below MaxSize, and we haven't reached the "recheck
535 // anyway" time threshold.
536 if cache.sizeMeasured > 0 &&
537 atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
538 time.Now().Before(cache.tidyHoldUntil) {
539 atomic.AddInt32(&cache.tidying, -1)
544 cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
545 atomic.AddInt32(&cache.tidying, -1)
549 // Delete cache files as needed to control disk usage.
550 func (cache *DiskCache) tidy() {
551 maxsize := cache.MaxSize
553 if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
554 var stat unix.Statfs_t
555 if nil == unix.Statfs(cache.Dir, &stat) {
556 maxsize = int64(stat.Bavail) * stat.Bsize / 10
558 atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
562 // Bail if a tidy goroutine is running in a different process.
563 lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
567 defer lockfile.Close()
568 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
580 filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
582 cache.debugf("tidy: skipping dir %s: %s", path, err)
588 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
592 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
593 // Access time is available (hopefully the
594 // filesystem is not mounted with noatime)
595 atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
597 // If access time isn't available we fall back
598 // to sorting by modification time.
599 atime = info.ModTime()
601 ents = append(ents, entT{path, atime, info.Size()})
602 totalsize += info.Size()
605 if cache.Logger != nil {
606 cache.Logger.WithFields(logrus.Fields{
607 "totalsize": totalsize,
609 }).Debugf("DiskCache: checked current cache usage")
612 // If MaxSize wasn't specified and we failed to come up with a
613 // defaultSize above, use the larger of {current cache size, 1
614 // GiB} as the defaultSize for subsequent tidy() operations.
616 if totalsize < 1<<30 {
617 atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
619 atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
621 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
625 // If we're below MaxSize or there's only one block in the
626 // cache, just update the usage estimate and return.
628 // (We never delete the last block because that would merely
629 // cause the same block to get re-fetched repeatedly from the
631 if totalsize <= maxsize || len(ents) == 1 {
632 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
633 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
637 // Set a new size target of maxsize minus 5%. This makes some
638 // room for sizeEstimate to grow before it triggers another
639 // tidy. We don't want to walk/sort an entire large cache
640 // directory each time we write a block.
641 target := maxsize - (maxsize / 20)
643 // Delete oldest entries until totalsize < target or we're
644 // down to a single cached block.
645 sort.Slice(ents, func(i, j int) bool {
646 return ents[i].atime.Before(ents[j].atime)
649 for _, ent := range ents {
651 go cache.deleteHeldopen(ent.path, nil)
653 totalsize -= ent.size
654 if totalsize <= target || deleted == len(ents)-1 {
659 if cache.Logger != nil {
660 cache.Logger.WithFields(logrus.Fields{
662 "totalsize": totalsize,
663 }).Debugf("DiskCache: remaining cache usage after deleting")
665 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
666 atomic.StoreInt64(&cache.sizeEstimated, totalsize)