From 4e69128e5e7aeb1a9c5e4462adb38ecc5f5bb8ea Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 19 Dec 2023 16:48:58 -0500 Subject: [PATCH 1/1] 20318: Return requested range while fetching remainder of block. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/keep_cache.go | 156 +++++++++++++++++++++++++++-------- 1 file changed, 121 insertions(+), 35 deletions(-) diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go index b8b3d9f588..567e0cbec3 100644 --- a/sdk/go/arvados/keep_cache.go +++ b/sdk/go/arvados/keep_cache.go @@ -50,6 +50,21 @@ type DiskCache struct { heldopen map[string]*openFileEnt heldopenMax int heldopenLock sync.Mutex + + // The "writing" fields allow multiple concurrent/sequential + // ReadAt calls to be notified as a single + // read-block-from-backend-into-cache goroutine fills the + // cache file. + writing map[string]*writeprogress + writingCond *sync.Cond + writingLock sync.Mutex +} + +type writeprogress struct { + cond *sync.Cond // broadcast whenever size or done changes + done bool // size and err have their final values + size int // bytes copied into cache file so far + err error // error encountered while copying from backend to cache } type openFileEnt struct { @@ -202,59 +217,116 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) return resp, err } +type funcwriter func([]byte) (int, error) + +func (fw funcwriter) Write(p []byte) (int, error) { + return fw(p) +} + // ReadAt reads the entire block from the wrapped KeepGateway into the // cache if needed, and copies the requested portion into the provided // slice. +// +// ReadAt returns as soon as the requested portion is available in the +// cache. The remainder of the block may continue to be copied into +// the cache in the background. func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) { cache.gotidy() cachefilename := cache.cacheFile(locator) if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil { return n, err } - f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR) + readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY) if err != nil { - return 0, fmt.Errorf("ReadAt: %s", cachefilename, err) + return 0, fmt.Errorf("ReadAt: %w", err) } - defer f.Close() + defer readf.Close() - err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) + err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH) if err != nil { return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err) } - size, err := f.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err) - } - if size < int64(len(dst)+offset) { - // The cache file seems to be truncated or empty - // (possibly because we just created it). Wait for an - // exclusive lock, then check again (in case someone - // else is doing the same thing) before trying to - // retrieve the entire block. - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) - if err != nil { - return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err) - } - } - size, err = f.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err) - } - if size < int64(len(dst)+offset) { - // The cache file is truncated or empty, and we own it - // now. Fill it. - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err) + cache.writingLock.Lock() + progress := cache.writing[cachefilename] + if progress != nil { + cache.writingLock.Unlock() + } else { + progress = &writeprogress{} + progress.cond = sync.NewCond(&sync.Mutex{}) + if cache.writing == nil { + cache.writing = map[string]*writeprogress{} } - n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f}) - if err != nil { - return 0, err - } - f.Truncate(int64(n)) + cache.writing[cachefilename] = progress + cache.writingLock.Unlock() + + // Start a goroutine to copy from backend to f. As + // data arrives, wake up any waiting loops (see below) + // so ReadAt() requests for partial data can return as + // soon as the relevant bytes have been copied. + go func() { + var size int + var writef *os.File + var err error + defer func() { + closeErr := writef.Close() + if err == nil { + err = closeErr + } + progress.cond.L.Lock() + progress.err = err + progress.done = true + progress.size = size + progress.cond.L.Unlock() + progress.cond.Broadcast() + cache.writingLock.Lock() + delete(cache.writing, cachefilename) + cache.writingLock.Unlock() + }() + writef, err = cache.openFile(cachefilename, os.O_WRONLY) + if err != nil { + err = fmt.Errorf("ReadAt: %w", err) + return + } + err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH) + if err != nil { + err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err) + return + } + size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{ + Locator: locator, + WriteTo: funcwriter(func(p []byte) (int, error) { + n, err := writef.Write(p) + if n > 0 { + progress.cond.L.Lock() + progress.size += n + progress.cond.L.Unlock() + progress.cond.Broadcast() + } + return n, err + })}) + }() + } + progress.cond.L.Lock() + for !progress.done && progress.size < len(dst)+offset { + progress.cond.Wait() + } + ok := progress.size >= len(dst)+offset + err = progress.err + progress.cond.L.Unlock() + + if !ok && err != nil { + // If the copy-from-backend goroutine encountered an + // error before copying enough bytes to satisfy our + // request, we return that error. + return 0, err + } else { + // Regardless of whether the copy-from-backend + // goroutine succeeded, or failed after copying the + // bytes we need, the only errors we need to report + // are errors reading from the cache file. + return readf.ReadAt(dst, int64(offset)) } - return f.ReadAt(dst, int64(offset)) } var quickReadAtLostRace = errors.New("quickReadAt: lost race") @@ -374,6 +446,20 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int // Other goroutine closed the file before we got RLock return 0, quickReadAtLostRace } + + // If another goroutine is currently writing the file, wait + // for it to catch up to the end of the range we need. + cache.writingLock.Lock() + progress := cache.writing[cachefilename] + cache.writingLock.Unlock() + if progress != nil { + progress.cond.L.Lock() + for !progress.done && progress.size < len(dst)+offset { + progress.cond.Wait() + } + progress.cond.L.Unlock() + } + n, err := heldopen.f.ReadAt(dst, int64(offset)) if err != nil { // wait for any concurrent users to finish, then -- 2.30.2