heldopen map[string]*openFileEnt
heldopenMax int
heldopenLock sync.Mutex
+
+ // The "writing" fields allow multiple concurrent/sequential
+ // ReadAt calls to be notified as a single
+ // read-block-from-backend-into-cache goroutine fills the
+ // cache file.
+ writing map[string]*writeprogress
+ writingCond *sync.Cond
+ writingLock sync.Mutex
+}
+
+type writeprogress struct {
+ cond *sync.Cond // broadcast whenever size or done changes
+ done bool // size and err have their final values
+ size int // bytes copied into cache file so far
+ err error // error encountered while copying from backend to cache
}
type openFileEnt struct {
return resp, err
}
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+ return fw(p)
+}
+
// ReadAt reads the entire block from the wrapped KeepGateway into the
// cache if needed, and copies the requested portion into the provided
// slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
cache.gotidy()
cachefilename := cache.cacheFile(locator)
if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
return n, err
}
- f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+ readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
if err != nil {
- return 0, fmt.Errorf("ReadAt: %s", cachefilename, err)
+ return 0, fmt.Errorf("ReadAt: %w", err)
}
- defer f.Close()
+ defer readf.Close()
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+ err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
if err != nil {
return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
}
- size, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
- }
- if size < int64(len(dst)+offset) {
- // The cache file seems to be truncated or empty
- // (possibly because we just created it). Wait for an
- // exclusive lock, then check again (in case someone
- // else is doing the same thing) before trying to
- // retrieve the entire block.
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
- if err != nil {
- return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
- }
- }
- size, err = f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
- }
- if size < int64(len(dst)+offset) {
- // The cache file is truncated or empty, and we own it
- // now. Fill it.
- _, err = f.Seek(0, io.SeekStart)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ if progress != nil {
+ cache.writingLock.Unlock()
+ } else {
+ progress = &writeprogress{}
+ progress.cond = sync.NewCond(&sync.Mutex{})
+ if cache.writing == nil {
+ cache.writing = map[string]*writeprogress{}
}
- n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
- if err != nil {
- return 0, err
- }
- f.Truncate(int64(n))
+ cache.writing[cachefilename] = progress
+ cache.writingLock.Unlock()
+
+ // Start a goroutine to copy from backend to f. As
+ // data arrives, wake up any waiting loops (see below)
+ // so ReadAt() requests for partial data can return as
+ // soon as the relevant bytes have been copied.
+ go func() {
+ var size int
+ var writef *os.File
+ var err error
+ defer func() {
+ closeErr := writef.Close()
+ if err == nil {
+ err = closeErr
+ }
+ progress.cond.L.Lock()
+ progress.err = err
+ progress.done = true
+ progress.size = size
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ cache.writingLock.Lock()
+ delete(cache.writing, cachefilename)
+ cache.writingLock.Unlock()
+ }()
+ writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+ if err != nil {
+ err = fmt.Errorf("ReadAt: %w", err)
+ return
+ }
+ err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+ return
+ }
+ size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+ Locator: locator,
+ WriteTo: funcwriter(func(p []byte) (int, error) {
+ n, err := writef.Write(p)
+ if n > 0 {
+ progress.cond.L.Lock()
+ progress.size += n
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ }
+ return n, err
+ })})
+ }()
+ }
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ ok := progress.size >= len(dst)+offset
+ err = progress.err
+ progress.cond.L.Unlock()
+
+ if !ok && err != nil {
+ // If the copy-from-backend goroutine encountered an
+ // error before copying enough bytes to satisfy our
+ // request, we return that error.
+ return 0, err
+ } else {
+ // Regardless of whether the copy-from-backend
+ // goroutine succeeded, or failed after copying the
+ // bytes we need, the only errors we need to report
+ // are errors reading from the cache file.
+ return readf.ReadAt(dst, int64(offset))
}
- return f.ReadAt(dst, int64(offset))
}
var quickReadAtLostRace = errors.New("quickReadAt: lost race")
// Other goroutine closed the file before we got RLock
return 0, quickReadAtLostRace
}
+
+ // If another goroutine is currently writing the file, wait
+ // for it to catch up to the end of the range we need.
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ cache.writingLock.Unlock()
+ if progress != nil {
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ progress.cond.L.Unlock()
+ }
+
n, err := heldopen.f.ReadAt(dst, int64(offset))
if err != nil {
// wait for any concurrent users to finish, then