X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7ef752823f118079af629604ac29143e7c156687..022107bd52092c658208e74161581c6bedda4a5f:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index f652a50023..92cf12ac18 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -198,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) { return fi.ModTime(), nil } -// Lock the locker (if one is in use), open the file for reading, and -// call the given function if and when the file is ready to read. -func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { - if err := v.lock(ctx); err != nil { - return err - } - defer v.unlock() - f, err := v.os.Open(path) - if err != nil { - return err - } - defer f.Close() - return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) -} - // stat is os.Stat() with some extra sanity checks. func (v *unixVolume) stat(path string) (os.FileInfo, error) { stat, err := v.os.Stat(path) @@ -227,41 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) { } // BlockRead reads a block from the volume. -func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { +func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { - return 0, v.translateError(err) + return v.translateError(err) } - var streamer *streamWriterAt - if v.locker != nil { - buf, err := v.bufferPool.GetContext(ctx) - if err != nil { - return 0, err - } - defer v.bufferPool.Put(buf) - streamer = newStreamWriterAt(w, 65536, buf) - defer streamer.Close() - w = streamer - } - var n int64 - err = v.getFunc(ctx, path, func(rdr io.Reader) error { - n, err = io.Copy(w, rdr) - if err == nil && n != stat.Size() { - err = io.ErrUnexpectedEOF - } + if err := v.lock(ctx); err != nil { + return err + } + defer v.unlock() + f, err := v.os.Open(path) + if err != nil { return err - }) - if streamer != nil { - // If we're using the streamer (and there's no error - // so far) flush any remaining buffered data now that - // getFunc has released the serialize lock. - if err == nil { - err = streamer.Close() - } - return streamer.WroteAt(), err } - return int(n), err + defer f.Close() + src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes) + dst := io.NewOffsetWriter(w, 0) + n, err := io.Copy(dst, src) + if err == nil && n != stat.Size() { + err = io.ErrUnexpectedEOF + } + return err } // BlockWrite stores a block on the volume. If it already exists, its