2960: Move streaming from volume to keepstore layer.
[arvados.git] / services / keepstore / s3_volume.go
index bd79d49e167fd77f8e768185189efd9cf620fc2c..d4b90540eac4aeb1e78f9ba0cb8ef4b0f7a2541a 100644 (file)
@@ -411,24 +411,13 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
 
 // BlockRead reads a Keep block that has been stored as a block blob
 // in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        key := v.key(hash)
-       buf, err := v.bufferPool.GetContext(ctx)
-       if err != nil {
-               return 0, err
-       }
-       defer v.bufferPool.Put(buf)
-
-       streamer := newStreamWriterAt(writeTo, 65536, buf)
-       defer streamer.Close()
-       err = v.readWorker(ctx, key, streamer)
+       err := v.readWorker(ctx, key, w)
        if err != nil {
                err = v.translateError(err)
                if !os.IsNotExist(err) {
-                       return 0, err
-               }
-               if streamer.WroteAt() > 0 {
-                       return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+                       return err
                }
 
                _, err = v.head("recent/" + key)
@@ -436,25 +425,21 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer
                if err != nil {
                        // If we can't read recent/X, there's no point in
                        // trying fixRace. Give up.
-                       return 0, err
+                       return err
                }
                if !v.fixRace(key) {
                        err = os.ErrNotExist
-                       return 0, err
+                       return err
                }
 
-               err = v.readWorker(ctx, key, streamer)
+               err = v.readWorker(ctx, key, w)
                if err != nil {
                        v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
                        err = v.translateError(err)
-                       return 0, err
+                       return err
                }
        }
-       err = streamer.Close()
-       if err != nil {
-               return 0, v.translateError(err)
-       }
-       return streamer.Wrote(), nil
+       return nil
 }
 
 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {