1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
25 "github.com/sirupsen/logrus"
26 "golang.org/x/sys/unix"
29 type KeepGateway interface {
30 ReadAt(locator string, dst []byte, offset int) (int, error)
31 BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32 BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33 LocalLocator(locator string) (string, error)
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 type DiskCache struct {
41 Logger logrus.FieldLogger
43 tidying int32 // see tidy()
44 tidyHoldUntil time.Time
47 // The "heldopen" fields are used to open cache files for
48 // reading, and leave them open for future/concurrent ReadAt
49 // operations. See quickReadAt.
50 heldopen map[string]*openFileEnt
52 heldopenLock sync.Mutex
55 type openFileEnt struct {
58 err error // if err is non-nil, f should not be used.
62 cacheFileSuffix = ".keepcacheblock"
63 tmpFileSuffix = ".tmp"
64 tidyHoldDuration = 10 * time.Second
67 func (cache *DiskCache) cacheFile(locator string) string {
69 if i := strings.Index(hash, "+"); i > 0 {
72 return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
75 // Open a cache file, creating the parent dir if necessary.
76 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
77 f, err := os.OpenFile(name, flags, 0600)
78 if os.IsNotExist(err) {
79 // Create the parent dir and try again. (We could have
80 // checked/created the parent dir before, but that
81 // would be less efficient in the much more common
82 // situation where it already exists.)
83 parent, _ := filepath.Split(name)
84 os.Mkdir(parent, 0700)
85 f, err = os.OpenFile(name, flags, 0600)
90 // Rename a file, creating the new path's parent dir if necessary.
91 func (cache *DiskCache) rename(old, new string) error {
92 if nil == os.Rename(old, new) {
95 parent, _ := filepath.Split(new)
96 os.Mkdir(parent, 0700)
97 return os.Rename(old, new)
100 func (cache *DiskCache) debugf(format string, args ...interface{}) {
101 logger := cache.Logger
105 logger.Debugf(format, args...)
108 // BlockWrite writes through to the wrapped KeepGateway, and (if
109 // possible) retains a copy of the written block in the cache.
110 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
112 unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
113 tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
114 tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
116 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
117 return cache.KeepGateway.BlockWrite(ctx, opts)
120 ctx, cancel := context.WithCancel(ctx)
122 copyerr := make(chan error, 1)
124 // Start a goroutine to copy the caller's source data to
125 // tmpfile, a hash checker, and (via pipe) the wrapped
127 pipereader, pipewriter := io.Pipe()
128 defer pipereader.Close()
130 defer tmpfile.Close()
131 defer os.Remove(tmpfilename)
132 defer pipewriter.Close()
134 // Copy from opts.Data or opts.Reader, depending on
135 // which was provided.
137 if opts.Data != nil {
138 src = bytes.NewReader(opts.Data)
143 hashcheck := md5.New()
144 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
149 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
150 copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
154 err = tmpfile.Close()
156 // Don't rename tmpfile into place, but allow
157 // the BlockWrite call to succeed if nothing
161 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
162 if opts.Hash != "" && opts.Hash != hash {
163 // Even if the wrapped KeepGateway doesn't
164 // notice a problem, this should count as an
166 copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
170 cachefilename := cache.cacheFile(hash)
171 err = cache.rename(tmpfilename, cachefilename)
173 cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
177 // Write through to the wrapped KeepGateway from the pipe,
178 // instead of the original reader.
180 if newopts.DataSize == 0 {
181 newopts.DataSize = len(newopts.Data)
183 newopts.Reader = pipereader
186 resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
187 if len(copyerr) > 0 {
188 // If the copy-to-pipe goroutine failed, that error
189 // will be more helpful than the resulting "context
190 // canceled" or "read [from pipereader] failed" error
191 // seen by the wrapped KeepGateway.
193 // If the wrapped KeepGateway encounters an error
194 // before all the data is copied into the pipe, it
195 // stops reading from the pipe, which causes the
196 // io.Copy() in the goroutine to block until our
197 // deferred pipereader.Close() call runs. In that case
198 // len(copyerr)==0 here, so the wrapped KeepGateway
199 // error is the one we return to our caller.
205 // ReadAt reads the entire block from the wrapped KeepGateway into the
206 // cache if needed, and copies the requested portion into the provided
208 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
210 cachefilename := cache.cacheFile(locator)
211 if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
214 f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
216 return 0, fmt.Errorf("ReadAt: open(%s) failed: %s", cachefilename, err)
220 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
222 return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
225 size, err := f.Seek(0, io.SeekEnd)
227 return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
229 if size < int64(len(dst)+offset) {
230 // The cache file seems to be truncated or empty
231 // (possibly because we just created it). Wait for an
232 // exclusive lock, then check again (in case someone
233 // else is doing the same thing) before trying to
234 // retrieve the entire block.
235 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
237 return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
240 size, err = f.Seek(0, io.SeekEnd)
242 return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
244 if size < int64(len(dst)+offset) {
245 // The cache file is truncated or empty, and we own it
247 _, err = f.Seek(0, io.SeekStart)
249 return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
251 n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
257 return f.ReadAt(dst, int64(offset))
260 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
262 // Remove the cache entry for the indicated cachefilename if it
263 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
266 // If expect is non-nil, close expect's filehandle.
268 // If expect is nil and a different cache entry is deleted, close its
270 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
273 cache.heldopenLock.Lock()
274 found := cache.heldopen[cachefilename]
275 if found != nil && (expect == nil || expect == found) {
276 delete(cache.heldopen, cachefilename)
279 cache.heldopenLock.Unlock()
281 if needclose != nil {
283 defer needclose.Unlock()
284 if needclose.f != nil {
291 // quickReadAt attempts to use a cached-filehandle approach to read
292 // from the indicated file. The expectation is that the caller
293 // (ReadAt) will try a more robust approach when this fails, so
294 // quickReadAt doesn't try especially hard to ensure success in
295 // races. In particular, when there are concurrent calls, and one
296 // fails, that can cause others to fail too.
297 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
299 cache.heldopenLock.Lock()
300 if cache.heldopenMax == 0 {
301 // Choose a reasonable limit on open cache files based
302 // on RLIMIT_NOFILE. Note Go automatically raises
303 // softlimit to hardlimit, so it's typically 1048576,
305 lim := syscall.Rlimit{}
306 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
308 cache.heldopenMax = 256
309 } else if lim.Cur > 40000 {
310 cache.heldopenMax = 10000
312 cache.heldopenMax = int(lim.Cur / 4)
315 heldopen := cache.heldopen[cachefilename]
318 heldopen = &openFileEnt{}
319 if cache.heldopen == nil {
320 cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
321 } else if len(cache.heldopen) > cache.heldopenMax {
322 // Rather than go to the trouble of tracking
323 // last access time, just close all files, and
324 // open again as needed. Even in the worst
325 // pathological case, this causes one extra
326 // open+close per read, which is not
327 // especially bad (see benchmarks).
328 go func(m map[string]*openFileEnt) {
329 for _, heldopen := range m {
331 defer heldopen.Unlock()
332 if heldopen.f != nil {
340 cache.heldopen[cachefilename] = heldopen
343 cache.heldopenLock.Unlock()
346 // Open and flock the file, then call wg.Done() to
347 // unblock any other goroutines that are waiting in
348 // the !isnew case above.
349 f, err := os.Open(cachefilename)
351 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
360 go cache.deleteHeldopen(cachefilename, heldopen)
364 // Acquire read lock to ensure (1) initialization is complete,
365 // if it's done by a different goroutine, and (2) any "delete
366 // old/unused entries" waits for our read to finish before
369 defer heldopen.RUnlock()
370 if heldopen.err != nil {
371 // Other goroutine encountered an error during setup
372 return 0, heldopen.err
373 } else if heldopen.f == nil {
374 // Other goroutine closed the file before we got RLock
375 return 0, quickReadAtLostRace
377 n, err := heldopen.f.ReadAt(dst, int64(offset))
379 // wait for any concurrent users to finish, then
380 // delete this cache entry in case reopening the
381 // backing file helps.
382 go cache.deleteHeldopen(cachefilename, heldopen)
387 // BlockRead reads the entire block from the wrapped KeepGateway into
388 // the cache if needed, and writes it to the provided writer.
389 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
391 cachefilename := cache.cacheFile(opts.Locator)
392 f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
394 cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
395 return cache.KeepGateway.BlockRead(ctx, opts)
399 i := strings.Index(opts.Locator, "+")
400 if i < 0 || i >= len(opts.Locator) {
401 return 0, errors.New("invalid block locator: no size hint")
403 sizestr := opts.Locator[i+1:]
404 i = strings.Index(sizestr, "+")
406 sizestr = sizestr[:i]
408 blocksize, err := strconv.ParseInt(sizestr, 10, 32)
409 if err != nil || blocksize < 0 {
410 return 0, errors.New("invalid block locator: invalid size hint")
413 err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
417 filesize, err := f.Seek(0, io.SeekEnd)
421 _, err = f.Seek(0, io.SeekStart)
425 if filesize == blocksize {
426 n, err := io.Copy(opts.WriteTo, f)
429 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
433 opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
434 n, err := cache.KeepGateway.BlockRead(ctx, opts)
442 // Start a tidy() goroutine, unless one is already running / recently
444 func (cache *DiskCache) gotidy() {
445 // Return quickly if another tidy goroutine is running in this process.
446 n := atomic.AddInt32(&cache.tidying, 1)
447 if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
448 atomic.AddInt32(&cache.tidying, -1)
453 cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
454 atomic.AddInt32(&cache.tidying, -1)
458 // Delete cache files as needed to control disk usage.
459 func (cache *DiskCache) tidy() {
460 maxsize := cache.MaxSize
462 if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
463 var stat unix.Statfs_t
464 if nil == unix.Statfs(cache.Dir, &stat) {
465 maxsize = int64(stat.Bavail) * stat.Bsize / 10
467 atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
471 // Bail if a tidy goroutine is running in a different process.
472 lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
476 defer lockfile.Close()
477 err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
489 filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
491 cache.debugf("tidy: skipping dir %s: %s", path, err)
497 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
501 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
502 // Access time is available (hopefully the
503 // filesystem is not mounted with noatime)
504 atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
506 // If access time isn't available we fall back
507 // to sorting by modification time.
508 atime = info.ModTime()
510 ents = append(ents, entT{path, atime, info.Size()})
511 totalsize += info.Size()
514 if cache.Logger != nil {
515 cache.Logger.WithFields(logrus.Fields{
516 "totalsize": totalsize,
518 }).Debugf("DiskCache: checked current cache usage")
521 // If MaxSize wasn't specified and we failed to come up with a
522 // defaultSize above, use the larger of {current cache size, 1
523 // GiB} as the defaultSize for subsequent tidy() operations.
525 if totalsize < 1<<30 {
526 atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
528 atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
530 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
534 if totalsize <= maxsize {
538 // Delete oldest entries until totalsize < maxsize.
539 sort.Slice(ents, func(i, j int) bool {
540 return ents[i].atime.Before(ents[j].atime)
543 for _, ent := range ents {
545 go cache.deleteHeldopen(ent.path, nil)
547 totalsize -= ent.size
548 if totalsize <= maxsize {
553 if cache.Logger != nil {
554 cache.Logger.WithFields(logrus.Fields{
556 "totalsize": totalsize,
557 }).Debugf("DiskCache: remaining cache usage after deleting")