// streamWriterAt writes the data to the provided io.Writer in
// sequential order.
//
+// streamWriterAt can also be used as an asynchronous buffer: the
+// caller can use the io.Writer interface to write into a memory
+// buffer and return without waiting for the wrapped writer to catch
+// up.
+//
// Close returns when all data has been written through.
type streamWriterAt struct {
writer io.Writer
buf []byte
+ writepos int // target offset if Write is called
partsize int // size of each part written through to writer
endpos int // portion of buf actually used, judging by WriteAt calls so far
partfilled []int // number of bytes written to each part so far
}
}
+// Write implements io.Writer.
+func (swa *streamWriterAt) Write(p []byte) (int, error) {
+ n, err := swa.WriteAt(p, int64(swa.writepos))
+ swa.writepos += n
+ return n, err
+}
+
// WriteAt implements io.WriterAt.
func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
pos := int(offset)
func newUnixVolume(params newVolumeParams) (volume, error) {
v := &unixVolume{
- uuid: params.UUID,
- cluster: params.Cluster,
- volume: params.ConfigVolume,
- logger: params.Logger,
- metrics: params.MetricsVecs,
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
}
err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
if err != nil {
Root string // path to the volume's root directory
Serialize bool
- uuid string
- cluster *arvados.Cluster
- volume arvados.Volume
- logger logrus.FieldLogger
- metrics *volumeMetricsVecs
+ uuid string
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bufferPool *bufferPool
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
if err != nil {
return 0, v.translateError(err)
}
+ var streamer *streamWriterAt
+ if v.locker != nil {
+ buf, err := v.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer v.bufferPool.Put(buf)
+ streamer = newStreamWriterAt(w, 65536, buf)
+ defer streamer.Close()
+ w = streamer
+ }
var n int64
err = v.getFunc(ctx, path, func(rdr io.Reader) error {
n, err = io.Copy(w, rdr)
}
return err
})
+ if streamer != nil {
+ // If we're using the streamer (and there's no error
+ // so far) flush any remaining buffered data now that
+ // getFunc has released the serialize lock.
+ if err == nil {
+ err = streamer.Close()
+ }
+ return streamer.WroteAt(), err
+ }
return int(n), err
}
}
v := &testableUnixVolume{
unixVolume: unixVolume{
- Root: d,
- locker: locker,
- uuid: params.UUID,
- cluster: params.Cluster,
- logger: params.Logger,
- volume: params.ConfigVolume,
- metrics: params.MetricsVecs,
+ Root: d,
+ locker: locker,
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ logger: params.Logger,
+ volume: params.ConfigVolume,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
},
t: c,
}