X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cdb63c3e5b6f11bfcb8244614d8a6fd309fbafce..da5858d5f794c14cf00b830166bb34b1bcd79ba5:/sdk/go/arvados/keep_cache.go diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go index 4dabbd3506..108081d5ac 100644 --- a/sdk/go/arvados/keep_cache.go +++ b/sdk/go/arvados/keep_cache.go @@ -17,6 +17,7 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -33,29 +34,96 @@ type KeepGateway interface { } // DiskCache wraps KeepGateway, adding a disk-based cache layer. +// +// A DiskCache is automatically incorporated into the backend stack of +// each keepclient.KeepClient. Most programs do not need to use +// DiskCache directly. type DiskCache struct { KeepGateway Dir string - MaxSize int64 + MaxSize ByteSizeOrPercent Logger logrus.FieldLogger + *sharedCache + setupOnce sync.Once +} + +var ( + sharedCachesLock sync.Mutex + sharedCaches = map[string]*sharedCache{} +) + +// sharedCache has fields that coordinate the cache usage in a single +// cache directory; it can be shared by multiple DiskCaches. +// +// This serves to share a single pool of held-open filehandles, a +// single tidying goroutine, etc., even when the program (like +// keep-web) uses multiple KeepGateway stacks that use different auth +// tokens, etc. +type sharedCache struct { + dir string + maxSize ByteSizeOrPercent + tidying int32 // see tidy() - tidyHoldUntil time.Time defaultMaxSize int64 + + // The "heldopen" fields are used to open cache files for + // reading, and leave them open for future/concurrent ReadAt + // operations. See quickReadAt. + heldopen map[string]*openFileEnt + heldopenMax int + heldopenLock sync.Mutex + + // The "writing" fields allow multiple concurrent/sequential + // ReadAt calls to be notified as a single + // read-block-from-backend-into-cache goroutine fills the + // cache file. + writing map[string]*writeprogress + writingCond *sync.Cond + writingLock sync.Mutex + + sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet + sizeEstimated int64 // last measured size, plus files we have written since + lastFileCount int64 // number of files on disk at last count + writesSinceTidy int64 // number of files written since last tidy() +} + +type writeprogress struct { + cond *sync.Cond // broadcast whenever size or done changes + done bool // size and err have their final values + size int // bytes copied into cache file so far + err error // error encountered while copying from backend to cache + sharedf *os.File // readable filehandle, usable if done && err==nil + readers sync.WaitGroup // goroutines that haven't finished reading from f yet +} + +type openFileEnt struct { + sync.RWMutex + f *os.File + err error // if err is non-nil, f should not be used. } const ( - cacheFileSuffix = ".keepcacheblock" - tmpFileSuffix = ".tmp" - tidyHoldDuration = 10 * time.Second + cacheFileSuffix = ".keepcacheblock" + tmpFileSuffix = ".tmp" ) +func (cache *DiskCache) setup() { + sharedCachesLock.Lock() + defer sharedCachesLock.Unlock() + dir := cache.Dir + if sharedCaches[dir] == nil { + sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize} + } + cache.sharedCache = sharedCaches[dir] +} + func (cache *DiskCache) cacheFile(locator string) string { hash := locator if i := strings.Index(hash, "+"); i > 0 { hash = hash[:i] } - return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix) + return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix) } // Open a cache file, creating the parent dir if necessary. @@ -94,9 +162,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) { // BlockWrite writes through to the wrapped KeepGateway, and (if // possible) retains a copy of the written block in the cache. func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) { - cache.gotidy() + cache.setupOnce.Do(cache.setup) unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix) - tmpfilename := filepath.Join(cache.Dir, "tmp", unique) + tmpfilename := filepath.Join(cache.dir, "tmp", unique) tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR) if err != nil { cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err) @@ -113,7 +181,11 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) pipereader, pipewriter := io.Pipe() defer pipereader.Close() go func() { + // Note this is a double-close (which is a no-op) in + // the happy path. defer tmpfile.Close() + // Note this is a no-op in the happy path (the + // uniquely named tmpfilename will have been renamed). defer os.Remove(tmpfilename) defer pipewriter.Close() @@ -158,6 +230,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) if err != nil { cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err) } + atomic.AddInt64(&cache.sizeEstimated, int64(n)) + cache.gotidy() }() // Write through to the wrapped KeepGateway from the pipe, @@ -188,71 +262,287 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) return resp, err } +type funcwriter func([]byte) (int, error) + +func (fw funcwriter) Write(p []byte) (int, error) { + return fw(p) +} + // ReadAt reads the entire block from the wrapped KeepGateway into the // cache if needed, and copies the requested portion into the provided // slice. +// +// ReadAt returns as soon as the requested portion is available in the +// cache. The remainder of the block may continue to be copied into +// the cache in the background. func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) { - cache.gotidy() + cache.setupOnce.Do(cache.setup) cachefilename := cache.cacheFile(locator) - f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR) + if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil { + return n, nil + } + + cache.writingLock.Lock() + progress := cache.writing[cachefilename] + if progress == nil { + // Nobody else is fetching from backend, so we'll add + // a new entry to cache.writing, fetch in a separate + // goroutine. + progress = &writeprogress{} + progress.cond = sync.NewCond(&sync.Mutex{}) + if cache.writing == nil { + cache.writing = map[string]*writeprogress{} + } + cache.writing[cachefilename] = progress + + // Start a goroutine to copy from backend to f. As + // data arrives, wake up any waiting loops (see below) + // so ReadAt() requests for partial data can return as + // soon as the relevant bytes have been copied. + go func() { + var size int + var err error + defer func() { + if err == nil && progress.sharedf != nil { + err = progress.sharedf.Sync() + } + progress.cond.L.Lock() + progress.err = err + progress.done = true + progress.size = size + progress.cond.L.Unlock() + progress.cond.Broadcast() + cache.writingLock.Lock() + delete(cache.writing, cachefilename) + cache.writingLock.Unlock() + + // Wait for other goroutines to wake + // up, notice we're done, and use our + // sharedf to read their data, before + // we close sharedf. + // + // Nobody can join the WaitGroup after + // the progress entry is deleted from + // cache.writing above. Therefore, + // this Wait ensures nobody else is + // accessing progress, and we don't + // need to lock anything. + progress.readers.Wait() + progress.sharedf.Close() + }() + progress.sharedf, err = cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR) + if err != nil { + err = fmt.Errorf("ReadAt: %w", err) + return + } + err = syscall.Flock(int(progress.sharedf.Fd()), syscall.LOCK_SH) + if err != nil { + err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err) + return + } + size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{ + Locator: locator, + WriteTo: funcwriter(func(p []byte) (int, error) { + n, err := progress.sharedf.Write(p) + if n > 0 { + progress.cond.L.Lock() + progress.size += n + progress.cond.L.Unlock() + progress.cond.Broadcast() + } + return n, err + })}) + atomic.AddInt64(&cache.sizeEstimated, int64(size)) + cache.gotidy() + }() + } + // We add ourselves to the readers WaitGroup so the + // fetch-from-backend goroutine doesn't close the shared + // filehandle before we read the data we need from it. + progress.readers.Add(1) + defer progress.readers.Done() + cache.writingLock.Unlock() + + progress.cond.L.Lock() + for !progress.done && progress.size < len(dst)+offset { + progress.cond.Wait() + } + sharedf := progress.sharedf + err := progress.err + progress.cond.L.Unlock() + if err != nil { - cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err) - return cache.KeepGateway.ReadAt(locator, dst, offset) + // If the copy-from-backend goroutine encountered an + // error, we return that error. (Even if we read the + // desired number of bytes, the error might be + // something like BadChecksum so we should not ignore + // it.) + return 0, err } - defer f.Close() + if len(dst) == 0 { + // It's possible that sharedf==nil here (the writer + // goroutine might not have done anything at all yet) + // and we don't need it anyway because no bytes are + // being read. Reading zero bytes seems pointless, but + // if someone does it, we might as well return + // suitable values, rather than risk a crash by + // calling sharedf.ReadAt() when sharedf is nil. + return 0, nil + } + return sharedf.ReadAt(dst, int64(offset)) +} - err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) - if err != nil { - return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err) +var quickReadAtLostRace = errors.New("quickReadAt: lost race") + +// Remove the cache entry for the indicated cachefilename if it +// matches expect (quickReadAt() usage), or if expect is nil (tidy() +// usage). +// +// If expect is non-nil, close expect's filehandle. +// +// If expect is nil and a different cache entry is deleted, close its +// filehandle. +func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) { + needclose := expect + + cache.heldopenLock.Lock() + found := cache.heldopen[cachefilename] + if found != nil && (expect == nil || expect == found) { + delete(cache.heldopen, cachefilename) + needclose = found + } + cache.heldopenLock.Unlock() + + if needclose != nil { + needclose.Lock() + defer needclose.Unlock() + if needclose.f != nil { + needclose.f.Close() + needclose.f = nil + } } +} - size, err := f.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err) - } - if size < int64(len(dst)+offset) { - // The cache file seems to be truncated or empty - // (possibly because we just created it). Wait for an - // exclusive lock, then check again (in case someone - // else is doing the same thing) before trying to - // retrieve the entire block. - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) +// quickReadAt attempts to use a cached-filehandle approach to read +// from the indicated file. The expectation is that the caller +// (ReadAt) will try a more robust approach when this fails, so +// quickReadAt doesn't try especially hard to ensure success in +// races. In particular, when there are concurrent calls, and one +// fails, that can cause others to fail too. +func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) { + isnew := false + cache.heldopenLock.Lock() + if cache.heldopenMax == 0 { + // Choose a reasonable limit on open cache files based + // on RLIMIT_NOFILE. Note Go automatically raises + // softlimit to hardlimit, so it's typically 1048576, + // not 1024. + lim := syscall.Rlimit{} + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim) if err != nil { - return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err) + cache.heldopenMax = 100 + } else if lim.Cur > 400000 { + cache.heldopenMax = 10000 + } else { + cache.heldopenMax = int(lim.Cur / 40) } } - size, err = f.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err) - } - if size < int64(len(dst)+offset) { - // The cache file is truncated or empty, and we own it - // now. Fill it. - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err) + heldopen := cache.heldopen[cachefilename] + if heldopen == nil { + isnew = true + heldopen = &openFileEnt{} + if cache.heldopen == nil { + cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax) + } else if len(cache.heldopen) > cache.heldopenMax { + // Rather than go to the trouble of tracking + // last access time, just close all files, and + // open again as needed. Even in the worst + // pathological case, this causes one extra + // open+close per read, which is not + // especially bad (see benchmarks). + go func(m map[string]*openFileEnt) { + for _, heldopen := range m { + heldopen.Lock() + defer heldopen.Unlock() + if heldopen.f != nil { + heldopen.f.Close() + heldopen.f = nil + } + } + }(cache.heldopen) + cache.heldopen = nil + } + cache.heldopen[cachefilename] = heldopen + heldopen.Lock() + } + cache.heldopenLock.Unlock() + + if isnew { + // Open and flock the file, save the filehandle (or + // error) in heldopen.f, and release the write lock so + // other goroutines waiting at heldopen.RLock() below + // can use the shared filehandle (or shared error). + f, err := os.Open(cachefilename) + if err == nil { + err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) + if err == nil { + heldopen.f = f + } else { + f.Close() + } } - n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f}) if err != nil { - return 0, err + heldopen.err = err + go cache.deleteHeldopen(cachefilename, heldopen) } - f.Truncate(int64(n)) + heldopen.Unlock() + } + // Acquire read lock to ensure (1) initialization is complete, + // if it's done by a different goroutine, and (2) any "delete + // old/unused entries" waits for our read to finish before + // closing the file. + heldopen.RLock() + defer heldopen.RUnlock() + if heldopen.err != nil { + // Other goroutine encountered an error during setup + return 0, heldopen.err + } else if heldopen.f == nil { + // Other goroutine closed the file before we got RLock + return 0, quickReadAtLostRace + } + + // If another goroutine is currently writing the file, wait + // for it to catch up to the end of the range we need. + cache.writingLock.Lock() + progress := cache.writing[cachefilename] + cache.writingLock.Unlock() + if progress != nil { + progress.cond.L.Lock() + for !progress.done && progress.size < len(dst)+offset { + progress.cond.Wait() + } + progress.cond.L.Unlock() + // If size= len(opts.Locator) { return 0, errors.New("invalid block locator: no size hint") @@ -267,66 +557,84 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i return 0, errors.New("invalid block locator: invalid size hint") } - err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) - if err != nil { - return 0, err - } - filesize, err := f.Seek(0, io.SeekEnd) - if err != nil { - return 0, err - } - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return 0, err - } - if filesize == blocksize { - n, err := io.Copy(opts.WriteTo, f) - return int(n), err - } - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) - if err != nil { - return 0, err - } - opts.WriteTo = io.MultiWriter(f, opts.WriteTo) - n, err := cache.KeepGateway.BlockRead(ctx, opts) - if err != nil { - return int(n), err + offset := 0 + buf := make([]byte, 131072) + for offset < int(blocksize) { + if ctx.Err() != nil { + return offset, ctx.Err() + } + if int(blocksize)-offset < len(buf) { + buf = buf[:int(blocksize)-offset] + } + nr, err := cache.ReadAt(opts.Locator, buf, offset) + if nr > 0 { + nw, err := opts.WriteTo.Write(buf[:nr]) + if err != nil { + return offset + nw, err + } + } + offset += nr + if err != nil { + return offset, err + } } - f.Truncate(int64(n)) - return n, nil + return offset, nil } // Start a tidy() goroutine, unless one is already running / recently // finished. func (cache *DiskCache) gotidy() { - // Return quickly if another tidy goroutine is running in this process. + writes := atomic.AddInt64(&cache.writesSinceTidy, 1) + // Skip if another tidy goroutine is running in this process. n := atomic.AddInt32(&cache.tidying, 1) - if n != 1 || time.Now().Before(cache.tidyHoldUntil) { + if n != 1 { + atomic.AddInt32(&cache.tidying, -1) + return + } + // Skip if sizeEstimated is based on an actual measurement and + // is below maxSize, and we haven't done very many writes + // since last tidy (defined as 1% of number of cache files at + // last count). + if cache.sizeMeasured > 0 && + atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) && + writes < cache.lastFileCount/100 { atomic.AddInt32(&cache.tidying, -1) return } go func() { cache.tidy() - cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration) + atomic.StoreInt64(&cache.writesSinceTidy, 0) atomic.AddInt32(&cache.tidying, -1) }() } // Delete cache files as needed to control disk usage. func (cache *DiskCache) tidy() { - maxsize := cache.MaxSize + maxsize := int64(cache.maxSize.ByteSize()) if maxsize < 1 { - if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 { + maxsize = atomic.LoadInt64(&cache.defaultMaxSize) + if maxsize == 0 { + // defaultMaxSize not yet computed. Use 10% of + // filesystem capacity (or different + // percentage if indicated by cache.maxSize) + pct := cache.maxSize.Percent() + if pct == 0 { + pct = 10 + } var stat unix.Statfs_t - if nil == unix.Statfs(cache.Dir, &stat) { - maxsize = int64(stat.Bavail) * stat.Bsize / 10 + if nil == unix.Statfs(cache.dir, &stat) { + maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100 + atomic.StoreInt64(&cache.defaultMaxSize, maxsize) + } else { + // In this case we will set + // defaultMaxSize below after + // measuring current usage. } - atomic.StoreInt64(&cache.defaultMaxSize, maxsize) } } // Bail if a tidy goroutine is running in a different process. - lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) + lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) if err != nil { return } @@ -343,7 +651,7 @@ func (cache *DiskCache) tidy() { } var ents []entT var totalsize int64 - filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error { + filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error { if err != nil { cache.debugf("tidy: skipping dir %s: %s", path, err) return nil @@ -377,7 +685,8 @@ func (cache *DiskCache) tidy() { // If MaxSize wasn't specified and we failed to come up with a // defaultSize above, use the larger of {current cache size, 1 - // GiB} as the defaultSize for subsequent tidy() operations. + // GiB} as the defaultMaxSize for subsequent tidy() + // operations. if maxsize == 0 { if totalsize < 1<<30 { atomic.StoreInt64(&cache.defaultMaxSize, 1<<30) @@ -388,20 +697,37 @@ func (cache *DiskCache) tidy() { return } - if totalsize <= maxsize { + // If we're below MaxSize or there's only one block in the + // cache, just update the usage estimate and return. + // + // (We never delete the last block because that would merely + // cause the same block to get re-fetched repeatedly from the + // backend.) + if totalsize <= maxsize || len(ents) == 1 { + atomic.StoreInt64(&cache.sizeMeasured, totalsize) + atomic.StoreInt64(&cache.sizeEstimated, totalsize) + cache.lastFileCount = int64(len(ents)) return } - // Delete oldest entries until totalsize < maxsize. + // Set a new size target of maxsize minus 5%. This makes some + // room for sizeEstimate to grow before it triggers another + // tidy. We don't want to walk/sort an entire large cache + // directory each time we write a block. + target := maxsize - (maxsize / 20) + + // Delete oldest entries until totalsize < target or we're + // down to a single cached block. sort.Slice(ents, func(i, j int) bool { return ents[i].atime.Before(ents[j].atime) }) deleted := 0 for _, ent := range ents { os.Remove(ent.path) + go cache.deleteHeldopen(ent.path, nil) deleted++ totalsize -= ent.size - if totalsize <= maxsize { + if totalsize <= target || deleted == len(ents)-1 { break } } @@ -412,4 +738,7 @@ func (cache *DiskCache) tidy() { "totalsize": totalsize, }).Debugf("DiskCache: remaining cache usage after deleting") } + atomic.StoreInt64(&cache.sizeMeasured, totalsize) + atomic.StoreInt64(&cache.sizeEstimated, totalsize) + cache.lastFileCount = int64(len(ents) - deleted) }