2960: Buffer reads when serialize enabled on unix volume.
authorTom Clegg <tom@curii.com>
Thu, 15 Feb 2024 01:25:04 +0000 (20:25 -0500)
committerTom Clegg <tom@curii.com>
Thu, 15 Feb 2024 01:25:04 +0000 (20:25 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keepstore/streamwriterat.go
services/keepstore/unix_volume.go
services/keepstore/unix_volume_test.go

index 365b55f233febb040f8faeb969fd0d257c804860..3426dadc1ffea0a6f2e8a0c850d29c1275dbd88a 100644 (file)
@@ -19,10 +19,16 @@ import (
 // streamWriterAt writes the data to the provided io.Writer in
 // sequential order.
 //
+// streamWriterAt can also be used as an asynchronous buffer: the
+// caller can use the io.Writer interface to write into a memory
+// buffer and return without waiting for the wrapped writer to catch
+// up.
+//
 // Close returns when all data has been written through.
 type streamWriterAt struct {
        writer     io.Writer
        buf        []byte
+       writepos   int         // target offset if Write is called
        partsize   int         // size of each part written through to writer
        endpos     int         // portion of buf actually used, judging by WriteAt calls so far
        partfilled []int       // number of bytes written to each part so far
@@ -81,6 +87,13 @@ func (swa *streamWriterAt) writeToWriter() {
        }
 }
 
+// Write implements io.Writer.
+func (swa *streamWriterAt) Write(p []byte) (int, error) {
+       n, err := swa.WriteAt(p, int64(swa.writepos))
+       swa.writepos += n
+       return n, err
+}
+
 // WriteAt implements io.WriterAt.
 func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
        pos := int(offset)
index f01ad97553fed07991bb2a667fbc65b8105ea8ca..f652a500238d29f1505866abd02c8c7c21b909f6 100644 (file)
@@ -33,11 +33,12 @@ func init() {
 
 func newUnixVolume(params newVolumeParams) (volume, error) {
        v := &unixVolume{
-               uuid:    params.UUID,
-               cluster: params.Cluster,
-               volume:  params.ConfigVolume,
-               logger:  params.Logger,
-               metrics: params.MetricsVecs,
+               uuid:       params.UUID,
+               cluster:    params.Cluster,
+               volume:     params.ConfigVolume,
+               logger:     params.Logger,
+               metrics:    params.MetricsVecs,
+               bufferPool: params.BufferPool,
        }
        err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
        if err != nil {
@@ -71,11 +72,12 @@ type unixVolume struct {
        Root      string // path to the volume's root directory
        Serialize bool
 
-       uuid    string
-       cluster *arvados.Cluster
-       volume  arvados.Volume
-       logger  logrus.FieldLogger
-       metrics *volumeMetricsVecs
+       uuid       string
+       cluster    *arvados.Cluster
+       volume     arvados.Volume
+       logger     logrus.FieldLogger
+       metrics    *volumeMetricsVecs
+       bufferPool *bufferPool
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -231,6 +233,17 @@ func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
        if err != nil {
                return 0, v.translateError(err)
        }
+       var streamer *streamWriterAt
+       if v.locker != nil {
+               buf, err := v.bufferPool.GetContext(ctx)
+               if err != nil {
+                       return 0, err
+               }
+               defer v.bufferPool.Put(buf)
+               streamer = newStreamWriterAt(w, 65536, buf)
+               defer streamer.Close()
+               w = streamer
+       }
        var n int64
        err = v.getFunc(ctx, path, func(rdr io.Reader) error {
                n, err = io.Copy(w, rdr)
@@ -239,6 +252,15 @@ func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
                }
                return err
        })
+       if streamer != nil {
+               // If we're using the streamer (and there's no error
+               // so far) flush any remaining buffered data now that
+               // getFunc has released the serialize lock.
+               if err == nil {
+                       err = streamer.Close()
+               }
+               return streamer.WroteAt(), err
+       }
        return int(n), err
 }
 
index de8d3c42d8299a7c2a5557f9a69192d74ce39283..715e23a9eaaac42329ced3d6b91d69da70f3c26e 100644 (file)
@@ -78,13 +78,14 @@ func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumePara
        }
        v := &testableUnixVolume{
                unixVolume: unixVolume{
-                       Root:    d,
-                       locker:  locker,
-                       uuid:    params.UUID,
-                       cluster: params.Cluster,
-                       logger:  params.Logger,
-                       volume:  params.ConfigVolume,
-                       metrics: params.MetricsVecs,
+                       Root:       d,
+                       locker:     locker,
+                       uuid:       params.UUID,
+                       cluster:    params.Cluster,
+                       logger:     params.Logger,
+                       volume:     params.ConfigVolume,
+                       metrics:    params.MetricsVecs,
+                       bufferPool: params.BufferPool,
                },
                t: c,
        }