func newUnixVolume(params newVolumeParams) (volume, error) {
v := &unixVolume{
- uuid: params.UUID,
- cluster: params.Cluster,
- volume: params.ConfigVolume,
- logger: params.Logger,
- metrics: params.MetricsVecs,
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
}
err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
if err != nil {
Root string // path to the volume's root directory
Serialize bool
- uuid string
- cluster *arvados.Cluster
- volume arvados.Volume
- logger logrus.FieldLogger
- metrics *volumeMetricsVecs
+ uuid string
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bufferPool *bufferPool
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
if err != nil {
return 0, v.translateError(err)
}
+ var streamer *streamWriterAt
+ if v.locker != nil {
+ buf, err := v.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer v.bufferPool.Put(buf)
+ streamer = newStreamWriterAt(w, 65536, buf)
+ defer streamer.Close()
+ w = streamer
+ }
var n int64
err = v.getFunc(ctx, path, func(rdr io.Reader) error {
n, err = io.Copy(w, rdr)
}
return err
})
+ if streamer != nil {
+ // If we're using the streamer (and there's no error
+ // so far) flush any remaining buffered data now that
+ // getFunc has released the serialize lock.
+ if err == nil {
+ err = streamer.Close()
+ }
+ return streamer.WroteAt(), err
+ }
return int(n), err
}