X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/52fa01051f4633e3bbbfcdf8c55994e7cd91212a..279efb4dd345bcb1beee2c77ac14d66e57103b9f:/sdk/go/arvados/keep_cache.go diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go index b9c7fea4b0..b6b2a9da66 100644 --- a/sdk/go/arvados/keep_cache.go +++ b/sdk/go/arvados/keep_cache.go @@ -17,6 +17,7 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -36,25 +37,87 @@ type KeepGateway interface { type DiskCache struct { KeepGateway Dir string - MaxSize int64 + MaxSize ByteSizeOrPercent Logger logrus.FieldLogger + *sharedCache + setupOnce sync.Once +} + +var ( + sharedCachesLock sync.Mutex + sharedCaches = map[string]*sharedCache{} +) + +// sharedCache has fields that coordinate the cache usage in a single +// cache directory; it can be shared by multiple DiskCaches. +// +// This serves to share a single pool of held-open filehandles, a +// single tidying goroutine, etc., even when the program (like +// keep-web) uses multiple KeepGateway stacks that use different auth +// tokens, etc. +type sharedCache struct { + dir string + maxSize ByteSizeOrPercent + tidying int32 // see tidy() - tidyHoldUntil time.Time defaultMaxSize int64 + + // The "heldopen" fields are used to open cache files for + // reading, and leave them open for future/concurrent ReadAt + // operations. See quickReadAt. + heldopen map[string]*openFileEnt + heldopenMax int + heldopenLock sync.Mutex + + // The "writing" fields allow multiple concurrent/sequential + // ReadAt calls to be notified as a single + // read-block-from-backend-into-cache goroutine fills the + // cache file. + writing map[string]*writeprogress + writingCond *sync.Cond + writingLock sync.Mutex + + sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet + sizeEstimated int64 // last measured size, plus files we have written since + lastFileCount int64 // number of files on disk at last count + writesSinceTidy int64 // number of files written since last tidy() } -var ( +type writeprogress struct { + cond *sync.Cond // broadcast whenever size or done changes + done bool // size and err have their final values + size int // bytes copied into cache file so far + err error // error encountered while copying from backend to cache +} + +type openFileEnt struct { + sync.RWMutex + f *os.File + err error // if err is non-nil, f should not be used. +} + +const ( cacheFileSuffix = ".keepcacheblock" tmpFileSuffix = ".tmp" ) +func (cache *DiskCache) setup() { + sharedCachesLock.Lock() + defer sharedCachesLock.Unlock() + dir := cache.Dir + if sharedCaches[dir] == nil { + sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize} + } + cache.sharedCache = sharedCaches[dir] +} + func (cache *DiskCache) cacheFile(locator string) string { hash := locator if i := strings.Index(hash, "+"); i > 0 { hash = hash[:i] } - return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix) + return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix) } // Open a cache file, creating the parent dir if necessary. @@ -93,9 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) { // BlockWrite writes through to the wrapped KeepGateway, and (if // possible) retains a copy of the written block in the cache. func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) { - cache.gotidy() + cache.setupOnce.Do(cache.setup) unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix) - tmpfilename := filepath.Join(cache.Dir, "tmp", unique) + tmpfilename := filepath.Join(cache.dir, "tmp", unique) tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR) if err != nil { cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err) @@ -157,6 +220,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) if err != nil { cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err) } + atomic.AddInt64(&cache.sizeEstimated, int64(n)) + cache.gotidy() }() // Write through to the wrapped KeepGateway from the pipe, @@ -187,71 +252,270 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) return resp, err } +type funcwriter func([]byte) (int, error) + +func (fw funcwriter) Write(p []byte) (int, error) { + return fw(p) +} + // ReadAt reads the entire block from the wrapped KeepGateway into the // cache if needed, and copies the requested portion into the provided // slice. +// +// ReadAt returns as soon as the requested portion is available in the +// cache. The remainder of the block may continue to be copied into +// the cache in the background. func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) { - cache.gotidy() + cache.setupOnce.Do(cache.setup) cachefilename := cache.cacheFile(locator) - f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR) + if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil { + return n, err + } + readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY) if err != nil { - cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err) - return cache.KeepGateway.ReadAt(locator, dst, offset) + return 0, fmt.Errorf("ReadAt: %w", err) } - defer f.Close() + defer readf.Close() - err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) + err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH) if err != nil { return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err) } - size, err := f.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err) - } - if size < int64(len(dst)+offset) { - // The cache file seems to be truncated or empty - // (possibly because we just created it). Wait for an - // exclusive lock, then check again (in case someone - // else is doing the same thing) before trying to - // retrieve the entire block. - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) - if err != nil { - return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err) + cache.writingLock.Lock() + progress := cache.writing[cachefilename] + if progress != nil { + cache.writingLock.Unlock() + } else { + progress = &writeprogress{} + progress.cond = sync.NewCond(&sync.Mutex{}) + if cache.writing == nil { + cache.writing = map[string]*writeprogress{} } + cache.writing[cachefilename] = progress + cache.writingLock.Unlock() + + // Start a goroutine to copy from backend to f. As + // data arrives, wake up any waiting loops (see below) + // so ReadAt() requests for partial data can return as + // soon as the relevant bytes have been copied. + go func() { + var size int + var writef *os.File + var err error + defer func() { + closeErr := writef.Close() + if err == nil { + err = closeErr + } + progress.cond.L.Lock() + progress.err = err + progress.done = true + progress.size = size + progress.cond.L.Unlock() + progress.cond.Broadcast() + cache.writingLock.Lock() + delete(cache.writing, cachefilename) + cache.writingLock.Unlock() + }() + writef, err = cache.openFile(cachefilename, os.O_WRONLY) + if err != nil { + err = fmt.Errorf("ReadAt: %w", err) + return + } + err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH) + if err != nil { + err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err) + return + } + size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{ + Locator: locator, + WriteTo: funcwriter(func(p []byte) (int, error) { + n, err := writef.Write(p) + if n > 0 { + progress.cond.L.Lock() + progress.size += n + progress.cond.L.Unlock() + progress.cond.Broadcast() + } + return n, err + })}) + atomic.AddInt64(&cache.sizeEstimated, int64(size)) + cache.gotidy() + }() + } + progress.cond.L.Lock() + for !progress.done && progress.size < len(dst)+offset { + progress.cond.Wait() + } + ok := progress.size >= len(dst)+offset + err = progress.err + progress.cond.L.Unlock() + + if !ok && err != nil { + // If the copy-from-backend goroutine encountered an + // error before copying enough bytes to satisfy our + // request, we return that error. + return 0, err + } else { + // Regardless of whether the copy-from-backend + // goroutine succeeded, or failed after copying the + // bytes we need, the only errors we need to report + // are errors reading from the cache file. + return readf.ReadAt(dst, int64(offset)) } - size, err = f.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err) +} + +var quickReadAtLostRace = errors.New("quickReadAt: lost race") + +// Remove the cache entry for the indicated cachefilename if it +// matches expect (quickReadAt() usage), or if expect is nil (tidy() +// usage). +// +// If expect is non-nil, close expect's filehandle. +// +// If expect is nil and a different cache entry is deleted, close its +// filehandle. +func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) { + needclose := expect + + cache.heldopenLock.Lock() + found := cache.heldopen[cachefilename] + if found != nil && (expect == nil || expect == found) { + delete(cache.heldopen, cachefilename) + needclose = found + } + cache.heldopenLock.Unlock() + + if needclose != nil { + needclose.Lock() + defer needclose.Unlock() + if needclose.f != nil { + needclose.f.Close() + needclose.f = nil + } } - if size < int64(len(dst)+offset) { - // The cache file is truncated or empty, and we own it - // now. Fill it. - _, err = f.Seek(0, io.SeekStart) +} + +// quickReadAt attempts to use a cached-filehandle approach to read +// from the indicated file. The expectation is that the caller +// (ReadAt) will try a more robust approach when this fails, so +// quickReadAt doesn't try especially hard to ensure success in +// races. In particular, when there are concurrent calls, and one +// fails, that can cause others to fail too. +func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) { + isnew := false + cache.heldopenLock.Lock() + if cache.heldopenMax == 0 { + // Choose a reasonable limit on open cache files based + // on RLIMIT_NOFILE. Note Go automatically raises + // softlimit to hardlimit, so it's typically 1048576, + // not 1024. + lim := syscall.Rlimit{} + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim) if err != nil { - return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err) + cache.heldopenMax = 100 + } else if lim.Cur > 400000 { + cache.heldopenMax = 10000 + } else { + cache.heldopenMax = int(lim.Cur / 40) + } + } + heldopen := cache.heldopen[cachefilename] + if heldopen == nil { + isnew = true + heldopen = &openFileEnt{} + if cache.heldopen == nil { + cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax) + } else if len(cache.heldopen) > cache.heldopenMax { + // Rather than go to the trouble of tracking + // last access time, just close all files, and + // open again as needed. Even in the worst + // pathological case, this causes one extra + // open+close per read, which is not + // especially bad (see benchmarks). + go func(m map[string]*openFileEnt) { + for _, heldopen := range m { + heldopen.Lock() + defer heldopen.Unlock() + if heldopen.f != nil { + heldopen.f.Close() + heldopen.f = nil + } + } + }(cache.heldopen) + cache.heldopen = nil + } + cache.heldopen[cachefilename] = heldopen + heldopen.Lock() + } + cache.heldopenLock.Unlock() + + if isnew { + // Open and flock the file, then call wg.Done() to + // unblock any other goroutines that are waiting in + // the !isnew case above. + f, err := os.Open(cachefilename) + if err == nil { + err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) + if err == nil { + heldopen.f = f + } else { + f.Close() + } } - n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f}) if err != nil { - return 0, err + heldopen.err = err + go cache.deleteHeldopen(cachefilename, heldopen) + } + heldopen.Unlock() + } + // Acquire read lock to ensure (1) initialization is complete, + // if it's done by a different goroutine, and (2) any "delete + // old/unused entries" waits for our read to finish before + // closing the file. + heldopen.RLock() + defer heldopen.RUnlock() + if heldopen.err != nil { + // Other goroutine encountered an error during setup + return 0, heldopen.err + } else if heldopen.f == nil { + // Other goroutine closed the file before we got RLock + return 0, quickReadAtLostRace + } + + // If another goroutine is currently writing the file, wait + // for it to catch up to the end of the range we need. + cache.writingLock.Lock() + progress := cache.writing[cachefilename] + cache.writingLock.Unlock() + if progress != nil { + progress.cond.L.Lock() + for !progress.done && progress.size < len(dst)+offset { + progress.cond.Wait() } - f.Truncate(int64(n)) + progress.cond.L.Unlock() + // If size= len(opts.Locator) { return 0, errors.New("invalid block locator: no size hint") @@ -266,66 +530,84 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i return 0, errors.New("invalid block locator: invalid size hint") } - err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) - if err != nil { - return 0, err - } - filesize, err := f.Seek(0, io.SeekEnd) - if err != nil { - return 0, err - } - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return 0, err - } - if filesize == blocksize { - n, err := io.Copy(opts.WriteTo, f) - return int(n), err - } - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) - if err != nil { - return 0, err - } - opts.WriteTo = io.MultiWriter(f, opts.WriteTo) - n, err := cache.KeepGateway.BlockRead(ctx, opts) - if err != nil { - return int(n), err + offset := 0 + buf := make([]byte, 131072) + for offset < int(blocksize) { + if ctx.Err() != nil { + return offset, ctx.Err() + } + if int(blocksize)-offset > len(buf) { + buf = buf[:int(blocksize)-offset] + } + nr, err := cache.ReadAt(opts.Locator, buf, offset) + if nr > 0 { + nw, err := opts.WriteTo.Write(buf) + if err != nil { + return offset + nw, err + } + } + offset += nr + if err != nil { + return offset, err + } } - f.Truncate(int64(n)) - return n, nil + return offset, nil } // Start a tidy() goroutine, unless one is already running / recently // finished. func (cache *DiskCache) gotidy() { - // Return quickly if another tidy goroutine is running in this process. + writes := atomic.AddInt64(&cache.writesSinceTidy, 1) + // Skip if another tidy goroutine is running in this process. n := atomic.AddInt32(&cache.tidying, 1) - if n != 1 || time.Now().Before(cache.tidyHoldUntil) { + if n != 1 { + atomic.AddInt32(&cache.tidying, -1) + return + } + // Skip if sizeEstimated is based on an actual measurement and + // is below maxSize, and we haven't done very many writes + // since last tidy (defined as 1% of number of cache files at + // last count). + if cache.sizeMeasured > 0 && + atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) && + writes < cache.lastFileCount/100 { atomic.AddInt32(&cache.tidying, -1) return } go func() { cache.tidy() - cache.tidyHoldUntil = time.Now().Add(10 * time.Second) + atomic.StoreInt64(&cache.writesSinceTidy, 0) atomic.AddInt32(&cache.tidying, -1) }() } // Delete cache files as needed to control disk usage. func (cache *DiskCache) tidy() { - maxsize := cache.MaxSize + maxsize := int64(cache.maxSize.ByteSize()) if maxsize < 1 { - if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 { + maxsize = atomic.LoadInt64(&cache.defaultMaxSize) + if maxsize == 0 { + // defaultMaxSize not yet computed. Use 10% of + // filesystem capacity (or different + // percentage if indicated by cache.maxSize) + pct := cache.maxSize.Percent() + if pct == 0 { + pct = 10 + } var stat unix.Statfs_t - if nil == unix.Statfs(cache.Dir, &stat) { - maxsize = int64(stat.Bavail) * stat.Bsize / 10 + if nil == unix.Statfs(cache.dir, &stat) { + maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100 + atomic.StoreInt64(&cache.defaultMaxSize, maxsize) + } else { + // In this case we will set + // defaultMaxSize below after + // measuring current usage. } - atomic.StoreInt64(&cache.defaultMaxSize, maxsize) } } // Bail if a tidy goroutine is running in a different process. - lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) + lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY) if err != nil { return } @@ -342,7 +624,7 @@ func (cache *DiskCache) tidy() { } var ents []entT var totalsize int64 - filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error { + filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error { if err != nil { cache.debugf("tidy: skipping dir %s: %s", path, err) return nil @@ -376,7 +658,8 @@ func (cache *DiskCache) tidy() { // If MaxSize wasn't specified and we failed to come up with a // defaultSize above, use the larger of {current cache size, 1 - // GiB} as the defaultSize for subsequent tidy() operations. + // GiB} as the defaultMaxSize for subsequent tidy() + // operations. if maxsize == 0 { if totalsize < 1<<30 { atomic.StoreInt64(&cache.defaultMaxSize, 1<<30) @@ -387,20 +670,37 @@ func (cache *DiskCache) tidy() { return } - if totalsize <= maxsize { + // If we're below MaxSize or there's only one block in the + // cache, just update the usage estimate and return. + // + // (We never delete the last block because that would merely + // cause the same block to get re-fetched repeatedly from the + // backend.) + if totalsize <= maxsize || len(ents) == 1 { + atomic.StoreInt64(&cache.sizeMeasured, totalsize) + atomic.StoreInt64(&cache.sizeEstimated, totalsize) + cache.lastFileCount = int64(len(ents)) return } - // Delete oldest entries until totalsize < maxsize. + // Set a new size target of maxsize minus 5%. This makes some + // room for sizeEstimate to grow before it triggers another + // tidy. We don't want to walk/sort an entire large cache + // directory each time we write a block. + target := maxsize - (maxsize / 20) + + // Delete oldest entries until totalsize < target or we're + // down to a single cached block. sort.Slice(ents, func(i, j int) bool { return ents[i].atime.Before(ents[j].atime) }) deleted := 0 for _, ent := range ents { os.Remove(ent.path) + go cache.deleteHeldopen(ent.path, nil) deleted++ totalsize -= ent.size - if totalsize <= maxsize { + if totalsize <= target || deleted == len(ents)-1 { break } } @@ -411,4 +711,7 @@ func (cache *DiskCache) tidy() { "totalsize": totalsize, }).Debugf("DiskCache: remaining cache usage after deleting") } + atomic.StoreInt64(&cache.sizeMeasured, totalsize) + atomic.StoreInt64(&cache.sizeEstimated, totalsize) + cache.lastFileCount = int64(len(ents) - deleted) }