Merge branch '21654-express-upgrade'. Refs #21654
[arvados.git] / services / keepstore / unix_volume.go
index f01ad97553fed07991bb2a667fbc65b8105ea8ca..92cf12ac189803d4f72f120708aced520a252c7f 100644 (file)
@@ -33,11 +33,12 @@ func init() {
 
 func newUnixVolume(params newVolumeParams) (volume, error) {
        v := &unixVolume{
-               uuid:    params.UUID,
-               cluster: params.Cluster,
-               volume:  params.ConfigVolume,
-               logger:  params.Logger,
-               metrics: params.MetricsVecs,
+               uuid:       params.UUID,
+               cluster:    params.Cluster,
+               volume:     params.ConfigVolume,
+               logger:     params.Logger,
+               metrics:    params.MetricsVecs,
+               bufferPool: params.BufferPool,
        }
        err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
        if err != nil {
@@ -71,11 +72,12 @@ type unixVolume struct {
        Root      string // path to the volume's root directory
        Serialize bool
 
-       uuid    string
-       cluster *arvados.Cluster
-       volume  arvados.Volume
-       logger  logrus.FieldLogger
-       metrics *volumeMetricsVecs
+       uuid       string
+       cluster    *arvados.Cluster
+       volume     arvados.Volume
+       logger     logrus.FieldLogger
+       metrics    *volumeMetricsVecs
+       bufferPool *bufferPool
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -196,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        return fi.ModTime(), nil
 }
 
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
-       if err := v.lock(ctx); err != nil {
-               return err
-       }
-       defer v.unlock()
-       f, err := v.os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer f.Close()
-       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
 // stat is os.Stat() with some extra sanity checks.
 func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
@@ -225,21 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) {
 }
 
 // BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return 0, v.translateError(err)
+               return v.translateError(err)
        }
-       var n int64
-       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err = io.Copy(w, rdr)
-               if err == nil && n != stat.Size() {
-                       err = io.ErrUnexpectedEOF
-               }
+       if err := v.lock(ctx); err != nil {
                return err
-       })
-       return int(n), err
+       }
+       defer v.unlock()
+       f, err := v.os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+       dst := io.NewOffsetWriter(w, 0)
+       n, err := io.Copy(dst, src)
+       if err == nil && n != stat.Size() {
+               err = io.ErrUnexpectedEOF
+       }
+       return err
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its