func newUnixVolume(params newVolumeParams) (volume, error) {
v := &unixVolume{
- uuid: params.UUID,
- cluster: params.Cluster,
- volume: params.ConfigVolume,
- logger: params.Logger,
- metrics: params.MetricsVecs,
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
}
err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
if err != nil {
Root string // path to the volume's root directory
Serialize bool
- uuid string
- cluster *arvados.Cluster
- volume arvados.Volume
- logger logrus.FieldLogger
- metrics *volumeMetricsVecs
+ uuid string
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bufferPool *bufferPool
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
return fi.ModTime(), nil
}
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
- if err := v.lock(ctx); err != nil {
- return err
- }
- defer v.unlock()
- f, err := v.os.Open(path)
- if err != nil {
- return err
- }
- defer f.Close()
- return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
// stat is os.Stat() with some extra sanity checks.
func (v *unixVolume) stat(path string) (os.FileInfo, error) {
stat, err := v.os.Stat(path)
}
// BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
path := v.blockPath(hash)
stat, err := v.stat(path)
if err != nil {
- return 0, v.translateError(err)
+ return v.translateError(err)
}
- var n int64
- err = v.getFunc(ctx, path, func(rdr io.Reader) error {
- n, err = io.Copy(w, rdr)
- if err == nil && n != stat.Size() {
- err = io.ErrUnexpectedEOF
- }
+ if err := v.lock(ctx); err != nil {
return err
- })
- return int(n), err
+ }
+ defer v.unlock()
+ f, err := v.os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+ dst := io.NewOffsetWriter(w, 0)
+ n, err := io.Copy(dst, src)
+ if err == nil && n != stat.Size() {
+ err = io.ErrUnexpectedEOF
+ }
+ return err
}
// BlockWrite stores a block on the volume. If it already exists, its