2960: Move streaming from volume to keepstore layer.
[arvados.git] / services / keepstore / unix_volume.go
index f652a500238d29f1505866abd02c8c7c21b909f6..92cf12ac189803d4f72f120708aced520a252c7f 100644 (file)
@@ -198,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        return fi.ModTime(), nil
 }
 
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
-       if err := v.lock(ctx); err != nil {
-               return err
-       }
-       defer v.unlock()
-       f, err := v.os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer f.Close()
-       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
 // stat is os.Stat() with some extra sanity checks.
 func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
@@ -227,41 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) {
 }
 
 // BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return 0, v.translateError(err)
+               return v.translateError(err)
        }
-       var streamer *streamWriterAt
-       if v.locker != nil {
-               buf, err := v.bufferPool.GetContext(ctx)
-               if err != nil {
-                       return 0, err
-               }
-               defer v.bufferPool.Put(buf)
-               streamer = newStreamWriterAt(w, 65536, buf)
-               defer streamer.Close()
-               w = streamer
-       }
-       var n int64
-       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err = io.Copy(w, rdr)
-               if err == nil && n != stat.Size() {
-                       err = io.ErrUnexpectedEOF
-               }
+       if err := v.lock(ctx); err != nil {
+               return err
+       }
+       defer v.unlock()
+       f, err := v.os.Open(path)
+       if err != nil {
                return err
-       })
-       if streamer != nil {
-               // If we're using the streamer (and there's no error
-               // so far) flush any remaining buffered data now that
-               // getFunc has released the serialize lock.
-               if err == nil {
-                       err = streamer.Close()
-               }
-               return streamer.WroteAt(), err
        }
-       return int(n), err
+       defer f.Close()
+       src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+       dst := io.NewOffsetWriter(w, 0)
+       n, err := io.Copy(dst, src)
+       if err == nil && n != stat.Size() {
+               err = io.ErrUnexpectedEOF
+       }
+       return err
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its