X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/39f6e9f70f683237d9488faac1c549ca19ac9dae..387d86217ab0f119285c12735a6d0f3e606c23a1:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index f01ad97553..92cf12ac18 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -33,11 +33,12 @@ func init() { func newUnixVolume(params newVolumeParams) (volume, error) { v := &unixVolume{ - uuid: params.UUID, - cluster: params.Cluster, - volume: params.ConfigVolume, - logger: params.Logger, - metrics: params.MetricsVecs, + uuid: params.UUID, + cluster: params.Cluster, + volume: params.ConfigVolume, + logger: params.Logger, + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, } err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v) if err != nil { @@ -71,11 +72,12 @@ type unixVolume struct { Root string // path to the volume's root directory Serialize bool - uuid string - cluster *arvados.Cluster - volume arvados.Volume - logger logrus.FieldLogger - metrics *volumeMetricsVecs + uuid string + cluster *arvados.Cluster + volume arvados.Volume + logger logrus.FieldLogger + metrics *volumeMetricsVecs + bufferPool *bufferPool // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -196,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) { return fi.ModTime(), nil } -// Lock the locker (if one is in use), open the file for reading, and -// call the given function if and when the file is ready to read. -func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { - if err := v.lock(ctx); err != nil { - return err - } - defer v.unlock() - f, err := v.os.Open(path) - if err != nil { - return err - } - defer f.Close() - return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) -} - // stat is os.Stat() with some extra sanity checks. func (v *unixVolume) stat(path string) (os.FileInfo, error) { stat, err := v.os.Stat(path) @@ -225,21 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) { } // BlockRead reads a block from the volume. -func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { +func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { - return 0, v.translateError(err) + return v.translateError(err) } - var n int64 - err = v.getFunc(ctx, path, func(rdr io.Reader) error { - n, err = io.Copy(w, rdr) - if err == nil && n != stat.Size() { - err = io.ErrUnexpectedEOF - } + if err := v.lock(ctx); err != nil { return err - }) - return int(n), err + } + defer v.unlock() + f, err := v.os.Open(path) + if err != nil { + return err + } + defer f.Close() + src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes) + dst := io.NewOffsetWriter(w, 0) + n, err := io.Copy(dst, src) + if err == nil && n != stat.Size() { + err = io.ErrUnexpectedEOF + } + return err } // BlockWrite stores a block on the volume. If it already exists, its