// BlockRead reads a Keep block that has been stored as a block blob
// in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
key := v.key(hash)
- buf, err := v.bufferPool.GetContext(ctx)
- if err != nil {
- return 0, err
- }
- defer v.bufferPool.Put(buf)
-
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
- err = v.readWorker(ctx, key, streamer)
+ err := v.readWorker(ctx, key, w)
if err != nil {
err = v.translateError(err)
if !os.IsNotExist(err) {
- return 0, err
- }
- if streamer.WroteAt() > 0 {
- return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+ return err
}
_, err = v.head("recent/" + key)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
- return 0, err
+ return err
}
if !v.fixRace(key) {
err = os.ErrNotExist
- return 0, err
+ return err
}
- err = v.readWorker(ctx, key, streamer)
+ err = v.readWorker(ctx, key, w)
if err != nil {
v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
err = v.translateError(err)
- return 0, err
+ return err
}
}
- err = streamer.Close()
- if err != nil {
- return 0, v.translateError(err)
- }
- return streamer.Wrote(), nil
+ return nil
}
func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {