2960: Move streaming from volume to keepstore layer.
authorTom Clegg <tom@curii.com>
Thu, 15 Feb 2024 20:20:41 +0000 (15:20 -0500)
committerTom Clegg <tom@curii.com>
Thu, 15 Feb 2024 20:20:41 +0000 (15:20 -0500)
Avoids using 2x buffers when comparing existing data during
BlockWrite.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

13 files changed:
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/router_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/streamwriterat.go
services/keepstore/unix_volume.go
services/keepstore/unix_volume_test.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go

index 31660614f3c8fd213e7859ae9634e798ac83754b..2c8a79350c86b02e08eea2007c58a8f2e632ca47 100644 (file)
@@ -147,24 +147,22 @@ func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a BlockWrite operation is in progress,
 // and wait for it to finish writing.
-func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        trashed, _, err := v.checkTrashed(hash)
        if err != nil {
-               return 0, err
+               return err
        }
        if trashed {
-               return 0, os.ErrNotExist
+               return os.ErrNotExist
        }
        buf, err := v.bufferPool.GetContext(ctx)
        if err != nil {
-               return 0, err
+               return err
        }
        defer v.bufferPool.Put(buf)
-       streamer := newStreamWriterAt(writeTo, 65536, buf)
-       defer streamer.Close()
        var deadline time.Time
-       size, err := v.get(ctx, hash, streamer)
-       for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
+       wrote, err := v.get(ctx, hash, w)
+       for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
                // Seeing a brand new empty block probably means we're
                // in a race with CreateBlob, which under the hood
                // (apparently) does "CreateEmpty" and "CommitData"
@@ -185,20 +183,15 @@ func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io
                }
                select {
                case <-ctx.Done():
-                       return 0, ctx.Err()
+                       return ctx.Err()
                case <-time.After(v.WriteRacePollTime.Duration()):
                }
-               size, err = v.get(ctx, hash, streamer)
+               wrote, err = v.get(ctx, hash, w)
        }
        if !deadline.IsZero() {
-               ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
-       }
-       if err != nil {
-               streamer.Close()
-               return streamer.Wrote(), err
+               ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
        }
-       err = streamer.Close()
-       return streamer.Wrote(), err
+       return err
 }
 
 func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
@@ -212,6 +205,7 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
 
        pieces := 1
        expectSize := BlockSize
+       sizeKnown := false
        if pieceSize < BlockSize {
                // Unfortunately the handler doesn't tell us how long
                // the blob is expected to be, so we have to ask
@@ -225,15 +219,15 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
                }
                expectSize = int(props.ContentLength)
                pieces = (expectSize + pieceSize - 1) / pieceSize
+               sizeKnown = true
        }
 
        if expectSize == 0 {
                return 0, nil
        }
 
-       // We'll update this actualSize if/when we get the last piece.
-       actualSize := -1
        errors := make(chan error, pieces)
+       var wrote atomic.Int64
        var wg sync.WaitGroup
        wg.Add(pieces)
        for p := 0; p < pieces; p++ {
@@ -289,31 +283,24 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
                                rdr.Close()
                        }()
                        n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
-                       if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+                       wrote.Add(n)
+                       if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
                                // If we don't know the actual size,
                                // and just tried reading 64 MiB, it's
                                // normal to encounter EOF.
                        } else if err != nil {
-                               if ctx.Err() == nil {
-                                       errors <- err
-                               }
+                               errors <- err
                                cancel()
                                return
                        }
-                       if p == pieces-1 {
-                               actualSize = startPos + int(n)
-                       }
                }(p)
        }
        wg.Wait()
        close(errors)
        if len(errors) > 0 {
-               return 0, v.translateError(<-errors)
-       }
-       if ctx.Err() != nil {
-               return 0, ctx.Err()
+               return int(wrote.Load()), v.translateError(<-errors)
        }
-       return actualSize, nil
+       return int(wrote.Load()), ctx.Err()
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its
index c629c9dc156367ab636b918d7a3671694a3e511f..b8acd980a1c6a57c8537ded3f2d4a90bb51ef331 100644 (file)
@@ -13,7 +13,6 @@ import (
        "encoding/xml"
        "flag"
        "fmt"
-       "io"
        "io/ioutil"
        "math/rand"
        "net"
@@ -490,15 +489,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
                if err != nil {
                        c.Error(err)
                }
-               gotData := bytes.NewBuffer(nil)
-               gotLen, err := v.BlockRead(context.Background(), hash, gotData)
+               gotData := &brbuffer{}
+               err = v.BlockRead(context.Background(), hash, gotData)
                if err != nil {
                        c.Error(err)
                }
                gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
-               if gotLen != size {
-                       c.Errorf("length mismatch: got %d != %d", gotLen, size)
-               }
+               c.Check(gotData.Len(), check.Equals, size)
                if gotHash != hash {
                        c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
                }
@@ -532,7 +529,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
        wg.Add(1)
        go func() {
                defer wg.Done()
-               _, err := v.BlockRead(context.Background(), TestHash, io.Discard)
+               err := v.BlockRead(context.Background(), TestHash, brdiscard)
                if err != nil {
                        c.Error(err)
                }
@@ -570,15 +567,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
        allDone := make(chan struct{})
        go func() {
                defer close(allDone)
-               buf := bytes.NewBuffer(nil)
-               n, err := v.BlockRead(context.Background(), TestHash, buf)
+               buf := &brbuffer{}
+               err := v.BlockRead(context.Background(), TestHash, buf)
                if err != nil {
                        c.Error(err)
                        return
                }
-               if n != 0 {
-                       c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
-               }
+               c.Check(buf.String(), check.Equals, "")
        }()
        select {
        case <-allDone:
@@ -596,8 +591,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
        s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
                v.BlockWriteRaw(TestHash, TestBlock)
-               _, err := v.BlockRead(ctx, TestHash, io.Discard)
-               return err
+               return v.BlockRead(ctx, TestHash, brdiscard)
        })
 }
 
@@ -667,7 +661,7 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := volume.BlockRead(context.Background(), loc, io.Discard)
+       err := volume.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -679,9 +673,9 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
 
-       _, err = volume.BlockRead(context.Background(), loc, io.Discard)
+       err = volume.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.IsNil)
-       _, err = volume.BlockRead(context.Background(), loc, io.Discard)
+       err = volume.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
index 62b6d15e565cfe6e60671055f073a6f9c11de864..c9a80230597c5eacc5bac7e66d48d82439420610 100644 (file)
@@ -243,13 +243,58 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
        } else {
                out = io.MultiWriter(out, hashcheck)
        }
+
+       buf, err := ks.bufferPool.GetContext(ctx)
+       if err != nil {
+               return 0, err
+       }
+       defer ks.bufferPool.Put(buf)
+       streamer := newStreamWriterAt(out, 65536, buf)
+       defer streamer.Close()
+
        var errToCaller error = os.ErrNotExist
        for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
                if ctx.Err() != nil {
                        return 0, ctx.Err()
                }
-               n, err = mnt.BlockRead(ctx, li.hash, out)
-               if err == nil && li.size > 0 && n != li.size {
+               err := mnt.BlockRead(ctx, li.hash, streamer)
+               if err != nil {
+                       if streamer.WroteAt() != 0 {
+                               // BlockRead encountered an error
+                               // after writing some data, so it's
+                               // too late to try another
+                               // volume. Flush streamer before
+                               // calling Wrote() to ensure our
+                               // return value accurately reflects
+                               // the number of bytes written to
+                               // opts.WriteTo.
+                               streamer.Close()
+                               return streamer.Wrote(), err
+                       }
+                       if !os.IsNotExist(err) {
+                               errToCaller = err
+                       }
+                       continue
+               }
+               if li.size == 0 {
+                       // hashCheckingWriter isn't in use because we
+                       // don't know the expected size. All we can do
+                       // is check after writing all the data, and
+                       // trust the caller is doing a HEAD request so
+                       // it's not too late to set an error code in
+                       // the response header.
+                       err = streamer.Close()
+                       if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+                               err = errChecksum
+                       }
+                       if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+                               // We didn't set the content-length header
+                               // above because we didn't know the block size
+                               // until now.
+                               rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+                       }
+                       return streamer.WroteAt(), err
+               } else if streamer.WroteAt() != li.size {
                        // If the backend read fewer bytes than
                        // expected but returns no error, we can
                        // classify this as a checksum error (even
@@ -260,42 +305,17 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
                        // it anyway, but if it's a HEAD request the
                        // caller can still change the response status
                        // code.
-                       return n, errChecksum
-               }
-               if err == nil && li.size == 0 {
-                       // hashCheckingWriter isn't in use because we
-                       // don't know the expected size. All we can do
-                       // is check after writing all the data, and
-                       // trust the caller is doing a HEAD request so
-                       // it's not too late to set an error code in
-                       // the response header.
-                       if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
-                               return n, errChecksum
-                       }
-               }
-               if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
-                       // We didn't set the content-length header
-                       // above because we didn't know the block size
-                       // until now.
-                       rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
-               }
-               if n > 0 || err == nil {
-                       // success, or there's an error but we can't
-                       // retry because we've already sent some data.
-                       return n, err
-               }
-               if !os.IsNotExist(err) {
-                       // If some volume returns a transient error,
-                       // return it to the caller instead of "Not
-                       // found" so it can retry.
-                       errToCaller = err
+                       return streamer.WroteAt(), errChecksum
                }
+               // Ensure streamer flushes all buffered data without
+               // errors.
+               err = streamer.Close()
+               return streamer.Wrote(), err
        }
        return 0, errToCaller
 }
 
 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
-       ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
        token := ctxToken(ctx)
        if token == "" {
                return 0, errNoTokenProvided
@@ -459,7 +479,7 @@ func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOpti
                        continue
                }
                cmp := &checkEqual{Expect: opts.Data}
-               if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+               if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
                        if !cmp.Equal() {
                                return resp, errCollision
                        }
@@ -564,25 +584,28 @@ func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
        return mnts
 }
 
-// checkEqual reports whether the data written to it (via io.Writer
+// checkEqual reports whether the data written to it (via io.WriterAt
 // interface) is equal to the expected data.
 //
 // Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
 type checkEqual struct {
-       Expect     []byte
-       equalUntil int
+       Expect   []byte
+       equal    atomic.Int64
+       notequal atomic.Bool
 }
 
 func (ce *checkEqual) Equal() bool {
-       return ce.equalUntil == len(ce.Expect)
+       return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
 }
 
-func (ce *checkEqual) Write(p []byte) (int, error) {
-       endpos := ce.equalUntil + len(p)
-       if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
-               ce.equalUntil = endpos
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+       endpos := int(offset) + len(p)
+       if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+               ce.equal.Add(int64(len(p)))
        } else {
-               ce.equalUntil = -1
+               ce.notequal.Store(true)
        }
        return len(p), nil
 }
index 3a01476096be7fcd9d3a15758acb3e69d605289b..28049506f6370f0fea8b28066a2b16cfa544e3c7 100644 (file)
@@ -372,7 +372,7 @@ func (s *keepstoreSuite) TestBlockTrash(c *C) {
                return ks.BlockTrash(ctx, loc)
        }
        checkexists := func(volidx int) bool {
-               _, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard)
+               err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
                if !os.IsNotExist(err) {
                        c.Check(err, IsNil)
                }
@@ -573,7 +573,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
        for _, mnt := range ks.mounts {
                err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
                c.Assert(err, IsNil)
-               _, err = mnt.BlockRead(context.Background(), fooHash, io.Discard)
+               err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
                c.Assert(err, IsNil)
        }
 
@@ -581,7 +581,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
        c.Check(os.IsNotExist(err), Equals, true)
 
        for _, mnt := range ks.mounts {
-               _, err := mnt.BlockRead(context.Background(), fooHash, io.Discard)
+               err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
                c.Assert(err, IsNil)
        }
 }
@@ -693,7 +693,7 @@ type stubVolume struct {
        // corresponding func (if non-nil). If the func returns an
        // error, that error is returned to caller. Otherwise, the
        // stub continues normally.
-       blockRead    func(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+       blockRead    func(ctx context.Context, hash string, writeTo io.WriterAt) error
        blockWrite   func(ctx context.Context, hash string, data []byte) error
        deviceID     func() string
        blockTouch   func(hash string) error
@@ -710,19 +710,19 @@ func (v *stubVolume) log(op, hash string) {
        v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
 }
 
-func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
        v.log("read", hash)
        if v.blockRead != nil {
-               n, err := v.blockRead(ctx, hash, writeTo)
+               err := v.blockRead(ctx, hash, writeTo)
                if err != nil {
-                       return n, err
+                       return err
                }
        }
        v.mtx.Lock()
        ent, ok := v.data[hash]
        v.mtx.Unlock()
        if !ok || !ent.trash.IsZero() {
-               return 0, os.ErrNotExist
+               return os.ErrNotExist
        }
        wrote := 0
        for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
@@ -730,13 +730,13 @@ func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writ
                if len(data) > writesize {
                        data = data[:writesize]
                }
-               n, err := writeTo.Write(data)
+               n, err := writeTo.WriteAt(data, int64(wrote))
                wrote += n
                if err != nil {
-                       return wrote, err
+                       return err
                }
        }
-       return wrote, nil
+       return nil
 }
 
 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
index f4bcdd4ae4df19ba8baccb87e7775282957b7fcb..ee7be4768c91499e667c8ba50aa512c7b3a930a3 100644 (file)
@@ -268,7 +268,7 @@ func (s *routerSuite) TestBlockTrash(c *C) {
        resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
        c.Check(resp.Code, Equals, http.StatusOK)
        c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
-       _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+       err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
        c.Assert(err, Equals, os.ErrNotExist)
 }
 
@@ -281,12 +281,12 @@ func (s *routerSuite) TestBlockUntrash(c *C) {
        c.Assert(err, IsNil)
        err = vol0.BlockTrash(fooHash)
        c.Assert(err, IsNil)
-       _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+       err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
        c.Assert(err, Equals, os.ErrNotExist)
        resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
        c.Check(resp.Code, Equals, http.StatusOK)
        c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
-       _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+       err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
        c.Check(err, IsNil)
 }
 
@@ -356,8 +356,8 @@ func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
 func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
        router, cancel := testRouter(c, s.cluster, nil)
        defer cancel()
-       router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.Writer) (int, error) {
-               return 0, httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
+       router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
+               return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
        }
 
        // To test whether we fall back to volume 1 after volume 0
@@ -472,10 +472,10 @@ func (s *routerSuite) TestCancelOnDisconnect(c *C) {
        defer cancel()
 
        unblock := make(chan struct{})
-       router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.Writer) (int, error) {
+       router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
                <-unblock
                c.Check(ctx.Err(), NotNil)
-               return 0, ctx.Err()
+               return ctx.Err()
        }
        go func() {
                time.Sleep(time.Second / 10)
index bd79d49e167fd77f8e768185189efd9cf620fc2c..d4b90540eac4aeb1e78f9ba0cb8ef4b0f7a2541a 100644 (file)
@@ -411,24 +411,13 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
 
 // BlockRead reads a Keep block that has been stored as a block blob
 // in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        key := v.key(hash)
-       buf, err := v.bufferPool.GetContext(ctx)
-       if err != nil {
-               return 0, err
-       }
-       defer v.bufferPool.Put(buf)
-
-       streamer := newStreamWriterAt(writeTo, 65536, buf)
-       defer streamer.Close()
-       err = v.readWorker(ctx, key, streamer)
+       err := v.readWorker(ctx, key, w)
        if err != nil {
                err = v.translateError(err)
                if !os.IsNotExist(err) {
-                       return 0, err
-               }
-               if streamer.WroteAt() > 0 {
-                       return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+                       return err
                }
 
                _, err = v.head("recent/" + key)
@@ -436,25 +425,21 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer
                if err != nil {
                        // If we can't read recent/X, there's no point in
                        // trying fixRace. Give up.
-                       return 0, err
+                       return err
                }
                if !v.fixRace(key) {
                        err = os.ErrNotExist
-                       return 0, err
+                       return err
                }
 
-               err = v.readWorker(ctx, key, streamer)
+               err = v.readWorker(ctx, key, w)
                if err != nil {
                        v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
                        err = v.translateError(err)
-                       return 0, err
+                       return err
                }
        }
-       err = streamer.Close()
-       if err != nil {
-               return 0, v.translateError(err)
-       }
-       return streamer.Wrote(), nil
+       return nil
 }
 
 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
index d814949f447ce5f55f6ab581180397c6a40b6e21..fb68e1c0574c338e9c016404e456f623acdcb477 100644 (file)
@@ -221,7 +221,7 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := v.BlockRead(context.Background(), loc, io.Discard)
+       err := v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
@@ -232,9 +232,9 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
 
-       _, err = v.BlockRead(context.Background(), loc, io.Discard)
+       err = v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.IsNil)
-       _, err = v.BlockRead(context.Background(), loc, io.Discard)
+       err = v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
@@ -261,8 +261,7 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 
 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
        s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
-               _, err := v.BlockRead(ctx, fooHash, io.Discard)
-               return err
+               return v.BlockRead(ctx, fooHash, brdiscard)
        })
 }
 
@@ -480,7 +479,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
 
                        // Check canGet
                        loc, blk := setupScenario()
-                       _, err := v.BlockRead(context.Background(), loc, io.Discard)
+                       err := v.BlockRead(context.Background(), loc, brdiscard)
                        c.Check(err == nil, check.Equals, scenario.canGet)
                        if err != nil {
                                c.Check(os.IsNotExist(err), check.Equals, true)
@@ -490,7 +489,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
                        loc, _ = setupScenario()
                        err = v.BlockTrash(loc)
                        c.Check(err == nil, check.Equals, scenario.canTrash)
-                       _, err = v.BlockRead(context.Background(), loc, io.Discard)
+                       err = v.BlockRead(context.Background(), loc, brdiscard)
                        c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
                        if err != nil {
                                c.Check(os.IsNotExist(err), check.Equals, true)
@@ -505,7 +504,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
                                // should be able to Get after Untrash --
                                // regardless of timestamps, errors, race
                                // conditions, etc.
-                               _, err = v.BlockRead(context.Background(), loc, io.Discard)
+                               err = v.BlockRead(context.Background(), loc, brdiscard)
                                c.Check(err, check.IsNil)
                        }
 
index 3426dadc1ffea0a6f2e8a0c850d29c1275dbd88a..02dce6e216103b0976cc23d19ecaadfe2039c989 100644 (file)
@@ -19,10 +19,10 @@ import (
 // streamWriterAt writes the data to the provided io.Writer in
 // sequential order.
 //
-// streamWriterAt can also be used as an asynchronous buffer: the
-// caller can use the io.Writer interface to write into a memory
-// buffer and return without waiting for the wrapped writer to catch
-// up.
+// streamWriterAt can also be wrapped with an io.OffsetWriter to
+// provide an asynchronous buffer: the caller can use the io.Writer
+// interface to write into a memory buffer and return without waiting
+// for the wrapped writer to catch up.
 //
 // Close returns when all data has been written through.
 type streamWriterAt struct {
@@ -87,14 +87,7 @@ func (swa *streamWriterAt) writeToWriter() {
        }
 }
 
-// Write implements io.Writer.
-func (swa *streamWriterAt) Write(p []byte) (int, error) {
-       n, err := swa.WriteAt(p, int64(swa.writepos))
-       swa.writepos += n
-       return n, err
-}
-
-// WriteAt implements io.WriterAt.
+// WriteAt implements io.WriterAt. WriteAt is goroutine-safe.
 func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
        pos := int(offset)
        n := 0
index f652a500238d29f1505866abd02c8c7c21b909f6..92cf12ac189803d4f72f120708aced520a252c7f 100644 (file)
@@ -198,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        return fi.ModTime(), nil
 }
 
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
-       if err := v.lock(ctx); err != nil {
-               return err
-       }
-       defer v.unlock()
-       f, err := v.os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer f.Close()
-       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
 // stat is os.Stat() with some extra sanity checks.
 func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
@@ -227,41 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) {
 }
 
 // BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return 0, v.translateError(err)
+               return v.translateError(err)
        }
-       var streamer *streamWriterAt
-       if v.locker != nil {
-               buf, err := v.bufferPool.GetContext(ctx)
-               if err != nil {
-                       return 0, err
-               }
-               defer v.bufferPool.Put(buf)
-               streamer = newStreamWriterAt(w, 65536, buf)
-               defer streamer.Close()
-               w = streamer
-       }
-       var n int64
-       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err = io.Copy(w, rdr)
-               if err == nil && n != stat.Size() {
-                       err = io.ErrUnexpectedEOF
-               }
+       if err := v.lock(ctx); err != nil {
+               return err
+       }
+       defer v.unlock()
+       f, err := v.os.Open(path)
+       if err != nil {
                return err
-       })
-       if streamer != nil {
-               // If we're using the streamer (and there's no error
-               // so far) flush any remaining buffered data now that
-               // getFunc has released the serialize lock.
-               if err == nil {
-                       err = streamer.Close()
-               }
-               return streamer.WroteAt(), err
        }
-       return int(n), err
+       defer f.Close()
+       src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+       dst := io.NewOffsetWriter(w, 0)
+       n, err := io.Copy(dst, src)
+       if err == nil && n != stat.Size() {
+               err = io.ErrUnexpectedEOF
+       }
+       return err
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its
index 715e23a9eaaac42329ced3d6b91d69da70f3c26e..bcdb5f6358652eb02fe8024b268d02b23f2eb8cf 100644 (file)
@@ -8,9 +8,7 @@ import (
        "bytes"
        "context"
        "encoding/json"
-       "errors"
        "fmt"
-       "io"
        "io/ioutil"
        "os"
        "sync"
@@ -123,16 +121,9 @@ func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
        defer v.Teardown()
        v.BlockWrite(context.Background(), TestHash, TestBlock)
 
-       buf := bytes.NewBuffer(nil)
-       _, err := v.BlockRead(context.Background(), TestHash2, buf)
-       switch {
-       case os.IsNotExist(err):
-               break
-       case err == nil:
-               c.Errorf("Read should have failed, returned %+q", buf.Bytes())
-       default:
-               c.Errorf("Read expected ErrNotExist, got: %s", err)
-       }
+       buf := &brbuffer{}
+       err := v.BlockRead(context.Background(), TestHash2, buf)
+       c.Check(err, check.FitsTypeOf, os.ErrNotExist)
 }
 
 func (s *unixVolumeSuite) TestPut(c *check.C) {
@@ -182,94 +173,6 @@ func (s *unixVolumeSuite) TestIsFull(c *check.C) {
        }
 }
 
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
-       v := s.newTestableUnixVolume(c, s.params, false)
-       defer v.Teardown()
-
-       v.BlockWrite(context.Background(), TestHash, TestBlock)
-       mockErr := errors.New("Mock error")
-       err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-               return mockErr
-       })
-       if err != mockErr {
-               c.Errorf("Got %v, expected %v", err, mockErr)
-       }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
-       v := s.newTestableUnixVolume(c, s.params, false)
-       defer v.Teardown()
-
-       funcCalled := false
-       err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-               funcCalled = true
-               return nil
-       })
-       if err == nil {
-               c.Errorf("Expected error opening non-existent file")
-       }
-       if funcCalled {
-               c.Errorf("Worker func should not have been called")
-       }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
-       v := s.newTestableUnixVolume(c, s.params, false)
-       defer v.Teardown()
-
-       v.BlockWrite(context.Background(), TestHash, TestBlock)
-
-       mtx := NewMockMutex()
-       v.locker = mtx
-
-       funcCalled := make(chan struct{})
-       go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-               funcCalled <- struct{}{}
-               return nil
-       })
-       select {
-       case mtx.AllowLock <- struct{}{}:
-       case <-funcCalled:
-               c.Fatal("Function was called before mutex was acquired")
-       case <-time.After(5 * time.Second):
-               c.Fatal("Timed out before mutex was acquired")
-       }
-       select {
-       case <-funcCalled:
-       case mtx.AllowUnlock <- struct{}{}:
-               c.Fatal("Mutex was released before function was called")
-       case <-time.After(5 * time.Second):
-               c.Fatal("Timed out waiting for funcCalled")
-       }
-       select {
-       case mtx.AllowUnlock <- struct{}{}:
-       case <-time.After(5 * time.Second):
-               c.Fatal("Timed out waiting for getFunc() to release mutex")
-       }
-}
-
-type MockMutex struct {
-       AllowLock   chan struct{}
-       AllowUnlock chan struct{}
-}
-
-func NewMockMutex() *MockMutex {
-       return &MockMutex{
-               AllowLock:   make(chan struct{}),
-               AllowUnlock: make(chan struct{}),
-       }
-}
-
-// Lock waits for someone to send to AllowLock.
-func (m *MockMutex) Lock() {
-       <-m.AllowLock
-}
-
-// Unlock waits for someone to send to AllowUnlock.
-func (m *MockMutex) Unlock() {
-       <-m.AllowUnlock
-}
-
 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
        v := s.newTestableUnixVolume(c, s.params, true)
        defer v.Teardown()
@@ -300,9 +203,10 @@ func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
                time.Sleep(50 * time.Millisecond)
                cancel()
        }()
-       n, err := v.BlockRead(ctx, TestHash, io.Discard)
-       if n > 0 || err != context.Canceled {
-               c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
+       buf := &brbuffer{}
+       err = v.BlockRead(ctx, TestHash, buf)
+       if buf.Len() != 0 || err != context.Canceled {
+               c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
        }
 }
 
@@ -317,7 +221,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
        c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
-       _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
+       err := vol.BlockRead(context.Background(), fooHash, brdiscard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -339,8 +243,8 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
        c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
 
-       buf := bytes.NewBuffer(nil)
-       _, err = vol.BlockRead(context.Background(), fooHash, buf)
+       buf := &brbuffer{}
+       err = vol.BlockRead(context.Background(), fooHash, buf)
        c.Check(err, check.IsNil)
        c.Check(buf.String(), check.Equals, "foo")
        c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
index a0b6fda7d3390155e35821196892406b00b933a2..f1b6781da6574312bf905f1feb080df44b25da45 100644 (file)
@@ -15,7 +15,7 @@ import (
 
 // volume is the interface to a back-end storage device.
 type volume interface {
-       BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+       BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error
        BlockWrite(ctx context.Context, hash string, data []byte) error
        DeviceID() string
        BlockTouch(hash string) error
index 22667743ddc2acc0a80d2034e9b3c5b2eb362085..16084058b7d57f3e8cef4b1ec1063337d8e62f84 100644 (file)
@@ -9,7 +9,6 @@ import (
        "context"
        "crypto/md5"
        "fmt"
-       "io"
        "os"
        "regexp"
        "sort"
@@ -127,8 +126,8 @@ func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
                t.Error(err)
        }
 
-       buf := bytes.NewBuffer(nil)
-       _, err = v.BlockRead(context.Background(), TestHash, buf)
+       buf := &brbuffer{}
+       err = v.BlockRead(context.Background(), TestHash, buf)
        if err != nil {
                t.Error(err)
        }
@@ -144,7 +143,7 @@ func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFact
        v := s.newVolume(t, factory)
        defer v.Teardown()
 
-       if _, err := v.BlockRead(context.Background(), barHash, io.Discard); err == nil {
+       if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil {
                t.Errorf("Expected error while getting non-existing block %v", barHash)
        }
 }
@@ -177,8 +176,8 @@ func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory Test
        v.BlockWrite(context.Background(), testHash, testDataA)
 
        putErr := v.BlockWrite(context.Background(), testHash, testDataB)
-       buf := bytes.NewBuffer(nil)
-       _, getErr := v.BlockRead(context.Background(), testHash, buf)
+       buf := &brbuffer{}
+       getErr := v.BlockRead(context.Background(), testHash, buf)
        if putErr == nil {
                // Put must not return a nil error unless it has
                // overwritten the existing data.
@@ -217,8 +216,8 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
        }
 
-       buf := bytes.NewBuffer(nil)
-       _, err = v.BlockRead(context.Background(), TestHash, buf)
+       buf := &brbuffer{}
+       err = v.BlockRead(context.Background(), TestHash, buf)
        if err != nil {
                t.Error(err)
        } else {
@@ -228,7 +227,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
        }
 
        buf.Reset()
-       _, err = v.BlockRead(context.Background(), TestHash2, buf)
+       err = v.BlockRead(context.Background(), TestHash2, buf)
        if err != nil {
                t.Error(err)
        } else {
@@ -238,7 +237,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
        }
 
        buf.Reset()
-       _, err = v.BlockRead(context.Background(), TestHash3, buf)
+       err = v.BlockRead(context.Background(), TestHash3, buf)
        if err != nil {
                t.Error(err)
        } else {
@@ -404,8 +403,8 @@ func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFact
        if err := v.BlockTrash(TestHash); err != nil {
                t.Error(err)
        }
-       buf := bytes.NewBuffer(nil)
-       _, err := v.BlockRead(context.Background(), TestHash, buf)
+       buf := &brbuffer{}
+       err := v.BlockRead(context.Background(), TestHash, buf)
        if err != nil {
                t.Error(err)
        } else if buf.String() != string(TestBlock) {
@@ -428,7 +427,7 @@ func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFact
        if err := v.BlockTrash(TestHash); err != nil {
                t.Error(err)
        }
-       if _, err := v.BlockRead(context.Background(), TestHash, io.Discard); err == nil || !os.IsNotExist(err) {
+       if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
 
@@ -517,7 +516,7 @@ func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVo
                v.BlockWrite(context.Background(), TestHash, TestBlock)
        }
 
-       _, err = v.BlockRead(context.Background(), TestHash, io.Discard)
+       err = v.BlockRead(context.Background(), TestHash, brdiscard)
        if err != nil {
                t.Error(err)
        }
@@ -546,8 +545,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
 
        sem := make(chan int)
        go func() {
-               buf := bytes.NewBuffer(nil)
-               _, err := v.BlockRead(context.Background(), TestHash, buf)
+               buf := &brbuffer{}
+               err := v.BlockRead(context.Background(), TestHash, buf)
                if err != nil {
                        t.Errorf("err1: %v", err)
                }
@@ -558,8 +557,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
        }()
 
        go func() {
-               buf := bytes.NewBuffer(nil)
-               _, err := v.BlockRead(context.Background(), TestHash2, buf)
+               buf := &brbuffer{}
+               err := v.BlockRead(context.Background(), TestHash2, buf)
                if err != nil {
                        t.Errorf("err2: %v", err)
                }
@@ -570,8 +569,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
        }()
 
        go func() {
-               buf := bytes.NewBuffer(nil)
-               _, err := v.BlockRead(context.Background(), TestHash3, buf)
+               buf := &brbuffer{}
+               err := v.BlockRead(context.Background(), TestHash3, buf)
                if err != nil {
                        t.Errorf("err3: %v", err)
                }
@@ -619,8 +618,8 @@ func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFacto
 
        // Check that we actually wrote the blocks.
        for _, blk := range blks {
-               buf := bytes.NewBuffer(nil)
-               _, err := v.BlockRead(context.Background(), blk.hash, buf)
+               buf := &brbuffer{}
+               err := v.BlockRead(context.Background(), blk.hash, buf)
                if err != nil {
                        t.Errorf("get %s: %v", blk.hash, err)
                } else if buf.String() != string(blk.data) {
@@ -644,13 +643,13 @@ func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactor
                t.Error(err)
        }
 
-       buf := bytes.NewBuffer(nil)
-       _, err = v.BlockRead(context.Background(), hash, buf)
+       buf := &brbuffer{}
+       err = v.BlockRead(context.Background(), hash, buf)
        if err != nil {
                t.Error(err)
        }
        if buf.String() != string(wdata) {
-               t.Error("buf %+q != wdata %+q", buf, wdata)
+               t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata))
        }
 }
 
@@ -668,8 +667,8 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
        v.BlockWrite(context.Background(), TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
 
-       buf := bytes.NewBuffer(nil)
-       _, err := v.BlockRead(context.Background(), TestHash, buf)
+       buf := &brbuffer{}
+       err := v.BlockRead(context.Background(), TestHash, buf)
        if err != nil {
                t.Error(err)
        }
@@ -684,7 +683,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
                return
        }
        buf.Reset()
-       _, err = v.BlockRead(context.Background(), TestHash, buf)
+       err = v.BlockRead(context.Background(), TestHash, buf)
        if err == nil || !os.IsNotExist(err) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
@@ -697,7 +696,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
 
        // Get the block - after trash and untrash sequence
        buf.Reset()
-       _, err = v.BlockRead(context.Background(), TestHash, buf)
+       err = v.BlockRead(context.Background(), TestHash, buf)
        if err != nil {
                t.Error(err)
        }
@@ -712,8 +711,8 @@ func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVo
        defer v.Teardown()
 
        checkGet := func() error {
-               buf := bytes.NewBuffer(nil)
-               _, err := v.BlockRead(context.Background(), TestHash, buf)
+               buf := &brbuffer{}
+               err := v.BlockRead(context.Background(), TestHash, buf)
                if err != nil {
                        return err
                }
index 5a17b3a7dc103355273ceb75f13fc3e320addcd8..f64041b04852e7fa9ca71235b2716d84843771f3 100644 (file)
@@ -5,6 +5,7 @@
 package keepstore
 
 import (
+       "sync"
        "time"
 )
 
@@ -38,3 +39,52 @@ type TestableVolume interface {
        // Clean up, delete temporary files.
        Teardown()
 }
+
+// brbuffer is like bytes.Buffer, but it implements io.WriterAt.
+// Convenient for testing (volume)BlockRead implementations.
+type brbuffer struct {
+       mtx sync.Mutex
+       buf []byte
+}
+
+func (b *brbuffer) WriteAt(p []byte, offset int64) (int, error) {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       if short := int(offset) + len(p) - len(b.buf); short > 0 {
+               b.buf = append(b.buf, make([]byte, short)...)
+       }
+       return copy(b.buf[offset:], p), nil
+}
+
+func (b *brbuffer) Bytes() []byte {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       return b.buf
+}
+
+func (b *brbuffer) String() string {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       return string(b.buf)
+}
+
+func (b *brbuffer) Len() int {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       return len(b.buf)
+}
+
+func (b *brbuffer) Reset() {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       b.buf = nil
+}
+
+// a brdiscarder is like io.Discard, but it implements
+// io.WriterAt. Convenient for testing (volume)BlockRead
+// implementations when the output is not checked.
+type brdiscarder struct{}
+
+func (brdiscarder) WriteAt(p []byte, offset int64) (int, error) { return len(p), nil }
+
+var brdiscard = brdiscarder{}