+ cache.writing[cachefilename] = progress
+ cache.writingLock.Unlock()
+
+ // Start a goroutine to copy from backend to f. As
+ // data arrives, wake up any waiting loops (see below)
+ // so ReadAt() requests for partial data can return as
+ // soon as the relevant bytes have been copied.
+ go func() {
+ var size int
+ var writef *os.File
+ var err error
+ defer func() {
+ closeErr := writef.Close()
+ if err == nil {
+ err = closeErr
+ }
+ progress.cond.L.Lock()
+ progress.err = err
+ progress.done = true
+ progress.size = size
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ cache.writingLock.Lock()
+ delete(cache.writing, cachefilename)
+ cache.writingLock.Unlock()
+ }()
+ writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+ if err != nil {
+ err = fmt.Errorf("ReadAt: %w", err)
+ return
+ }
+ err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+ return
+ }
+ size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+ Locator: locator,
+ WriteTo: funcwriter(func(p []byte) (int, error) {
+ n, err := writef.Write(p)
+ if n > 0 {
+ progress.cond.L.Lock()
+ progress.size += n
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ }
+ return n, err
+ })})
+ atomic.AddInt64(&cache.sizeEstimated, int64(size))
+ cache.gotidy()
+ }()
+ }
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ ok := progress.size >= len(dst)+offset
+ err = progress.err
+ progress.cond.L.Unlock()
+
+ if !ok && err != nil {
+ // If the copy-from-backend goroutine encountered an
+ // error before copying enough bytes to satisfy our
+ // request, we return that error.
+ return 0, err
+ } else {
+ // Regardless of whether the copy-from-backend
+ // goroutine succeeded, or failed after copying the
+ // bytes we need, the only errors we need to report
+ // are errors reading from the cache file.
+ return readf.ReadAt(dst, int64(offset))