+ if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
+ return n, nil
+ }
+
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ if progress == nil {
+ // Nobody else is fetching from backend, so we'll add
+ // a new entry to cache.writing, fetch in a separate
+ // goroutine.
+ progress = &writeprogress{}
+ progress.cond = sync.NewCond(&sync.Mutex{})
+ if cache.writing == nil {
+ cache.writing = map[string]*writeprogress{}
+ }
+ cache.writing[cachefilename] = progress
+
+ // Start a goroutine to copy from backend to f. As
+ // data arrives, wake up any waiting loops (see below)
+ // so ReadAt() requests for partial data can return as
+ // soon as the relevant bytes have been copied.
+ go func() {
+ var size int
+ var err error
+ defer func() {
+ if err == nil && progress.sharedf != nil {
+ err = progress.sharedf.Sync()
+ }
+ progress.cond.L.Lock()
+ progress.err = err
+ progress.done = true
+ progress.size = size
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ cache.writingLock.Lock()
+ delete(cache.writing, cachefilename)
+ cache.writingLock.Unlock()
+
+ // Wait for other goroutines to wake
+ // up, notice we're done, and use our
+ // sharedf to read their data, before
+ // we close sharedf.
+ //
+ // Nobody can join the WaitGroup after
+ // the progress entry is deleted from
+ // cache.writing above. Therefore,
+ // this Wait ensures nobody else is
+ // accessing progress, and we don't
+ // need to lock anything.
+ progress.readers.Wait()
+ progress.sharedf.Close()
+ }()
+ progress.sharedf, err = cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+ if err != nil {
+ err = fmt.Errorf("ReadAt: %w", err)
+ return
+ }
+ err = syscall.Flock(int(progress.sharedf.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+ return
+ }
+ size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+ Locator: locator,
+ WriteTo: funcwriter(func(p []byte) (int, error) {
+ n, err := progress.sharedf.Write(p)
+ if n > 0 {
+ progress.cond.L.Lock()
+ progress.size += n
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ }
+ return n, err
+ })})
+ atomic.AddInt64(&cache.sizeEstimated, int64(size))
+ cache.gotidy()
+ }()
+ }
+ // We add ourselves to the readers WaitGroup so the
+ // fetch-from-backend goroutine doesn't close the shared
+ // filehandle before we read the data we need from it.
+ progress.readers.Add(1)
+ defer progress.readers.Done()
+ cache.writingLock.Unlock()
+
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ sharedf := progress.sharedf
+ err := progress.err
+ progress.cond.L.Unlock()
+