20318: Return requested range while fetching remainder of block.
authorTom Clegg <tom@curii.com>
Tue, 19 Dec 2023 21:48:58 +0000 (16:48 -0500)
committerTom Clegg <tom@curii.com>
Tue, 19 Dec 2023 21:48:58 +0000 (16:48 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/keep_cache.go

index b8b3d9f58849751194e6effaa503e7a7896fa763..567e0cbec384a97fdb58fff294d031fe8b511894 100644 (file)
@@ -50,6 +50,21 @@ type DiskCache struct {
        heldopen     map[string]*openFileEnt
        heldopenMax  int
        heldopenLock sync.Mutex
+
+       // The "writing" fields allow multiple concurrent/sequential
+       // ReadAt calls to be notified as a single
+       // read-block-from-backend-into-cache goroutine fills the
+       // cache file.
+       writing     map[string]*writeprogress
+       writingCond *sync.Cond
+       writingLock sync.Mutex
+}
+
+type writeprogress struct {
+       cond *sync.Cond // broadcast whenever size or done changes
+       done bool       // size and err have their final values
+       size int        // bytes copied into cache file so far
+       err  error      // error encountered while copying from backend to cache
 }
 
 type openFileEnt struct {
@@ -202,59 +217,116 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
        return resp, err
 }
 
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+       return fw(p)
+}
+
 // ReadAt reads the entire block from the wrapped KeepGateway into the
 // cache if needed, and copies the requested portion into the provided
 // slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
        cache.gotidy()
        cachefilename := cache.cacheFile(locator)
        if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
                return n, err
        }
-       f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+       readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
        if err != nil {
-               return 0, fmt.Errorf("ReadAt: %s", cachefilename, err)
+               return 0, fmt.Errorf("ReadAt: %w", err)
        }
-       defer f.Close()
+       defer readf.Close()
 
-       err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+       err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
        if err != nil {
                return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
        }
 
-       size, err := f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file seems to be truncated or empty
-               // (possibly because we just created it). Wait for an
-               // exclusive lock, then check again (in case someone
-               // else is doing the same thing) before trying to
-               // retrieve the entire block.
-               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-               if err != nil {
-                       return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
-               }
-       }
-       size, err = f.Seek(0, io.SeekEnd)
-       if err != nil {
-               return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-       }
-       if size < int64(len(dst)+offset) {
-               // The cache file is truncated or empty, and we own it
-               // now. Fill it.
-               _, err = f.Seek(0, io.SeekStart)
-               if err != nil {
-                       return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       if progress != nil {
+               cache.writingLock.Unlock()
+       } else {
+               progress = &writeprogress{}
+               progress.cond = sync.NewCond(&sync.Mutex{})
+               if cache.writing == nil {
+                       cache.writing = map[string]*writeprogress{}
                }
-               n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
-               if err != nil {
-                       return 0, err
-               }
-               f.Truncate(int64(n))
+               cache.writing[cachefilename] = progress
+               cache.writingLock.Unlock()
+
+               // Start a goroutine to copy from backend to f. As
+               // data arrives, wake up any waiting loops (see below)
+               // so ReadAt() requests for partial data can return as
+               // soon as the relevant bytes have been copied.
+               go func() {
+                       var size int
+                       var writef *os.File
+                       var err error
+                       defer func() {
+                               closeErr := writef.Close()
+                               if err == nil {
+                                       err = closeErr
+                               }
+                               progress.cond.L.Lock()
+                               progress.err = err
+                               progress.done = true
+                               progress.size = size
+                               progress.cond.L.Unlock()
+                               progress.cond.Broadcast()
+                               cache.writingLock.Lock()
+                               delete(cache.writing, cachefilename)
+                               cache.writingLock.Unlock()
+                       }()
+                       writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+                       if err != nil {
+                               err = fmt.Errorf("ReadAt: %w", err)
+                               return
+                       }
+                       err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+                       if err != nil {
+                               err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+                               return
+                       }
+                       size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+                               Locator: locator,
+                               WriteTo: funcwriter(func(p []byte) (int, error) {
+                                       n, err := writef.Write(p)
+                                       if n > 0 {
+                                               progress.cond.L.Lock()
+                                               progress.size += n
+                                               progress.cond.L.Unlock()
+                                               progress.cond.Broadcast()
+                                       }
+                                       return n, err
+                               })})
+               }()
+       }
+       progress.cond.L.Lock()
+       for !progress.done && progress.size < len(dst)+offset {
+               progress.cond.Wait()
+       }
+       ok := progress.size >= len(dst)+offset
+       err = progress.err
+       progress.cond.L.Unlock()
+
+       if !ok && err != nil {
+               // If the copy-from-backend goroutine encountered an
+               // error before copying enough bytes to satisfy our
+               // request, we return that error.
+               return 0, err
+       } else {
+               // Regardless of whether the copy-from-backend
+               // goroutine succeeded, or failed after copying the
+               // bytes we need, the only errors we need to report
+               // are errors reading from the cache file.
+               return readf.ReadAt(dst, int64(offset))
        }
-       return f.ReadAt(dst, int64(offset))
 }
 
 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
@@ -374,6 +446,20 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
                // Other goroutine closed the file before we got RLock
                return 0, quickReadAtLostRace
        }
+
+       // If another goroutine is currently writing the file, wait
+       // for it to catch up to the end of the range we need.
+       cache.writingLock.Lock()
+       progress := cache.writing[cachefilename]
+       cache.writingLock.Unlock()
+       if progress != nil {
+               progress.cond.L.Lock()
+               for !progress.done && progress.size < len(dst)+offset {
+                       progress.cond.Wait()
+               }
+               progress.cond.L.Unlock()
+       }
+
        n, err := heldopen.f.ReadAt(dst, int64(offset))
        if err != nil {
                // wait for any concurrent users to finish, then