From 022107bd52092c658208e74161581c6bedda4a5f Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 15 Feb 2024 15:20:41 -0500 Subject: [PATCH] 2960: Move streaming from volume to keepstore layer. Avoids using 2x buffers when comparing existing data during BlockWrite. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keepstore/azure_blob_volume.go | 49 +++----- services/keepstore/azure_blob_volume_test.go | 28 ++--- services/keepstore/keepstore.go | 107 ++++++++++------- services/keepstore/keepstore_test.go | 22 ++-- services/keepstore/router_test.go | 14 +-- services/keepstore/s3_volume.go | 31 ++--- services/keepstore/s3_volume_test.go | 15 ++- services/keepstore/streamwriterat.go | 17 +-- services/keepstore/unix_volume.go | 60 +++------- services/keepstore/unix_volume_test.go | 116 ++----------------- services/keepstore/volume.go | 2 +- services/keepstore/volume_generic_test.go | 61 +++++----- services/keepstore/volume_test.go | 50 ++++++++ 13 files changed, 239 insertions(+), 333 deletions(-) diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 31660614f3..2c8a79350c 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -147,24 +147,22 @@ func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err // If the block is younger than azureWriteRaceInterval and is // unexpectedly empty, assume a BlockWrite operation is in progress, // and wait for it to finish writing. -func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { +func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { trashed, _, err := v.checkTrashed(hash) if err != nil { - return 0, err + return err } if trashed { - return 0, os.ErrNotExist + return os.ErrNotExist } buf, err := v.bufferPool.GetContext(ctx) if err != nil { - return 0, err + return err } defer v.bufferPool.Put(buf) - streamer := newStreamWriterAt(writeTo, 65536, buf) - defer streamer.Close() var deadline time.Time - size, err := v.get(ctx, hash, streamer) - for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" { + wrote, err := v.get(ctx, hash, w) + for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" { // Seeing a brand new empty block probably means we're // in a race with CreateBlob, which under the hood // (apparently) does "CreateEmpty" and "CommitData" @@ -185,20 +183,15 @@ func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io } select { case <-ctx.Done(): - return 0, ctx.Err() + return ctx.Err() case <-time.After(v.WriteRacePollTime.Duration()): } - size, err = v.get(ctx, hash, streamer) + wrote, err = v.get(ctx, hash, w) } if !deadline.IsZero() { - ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size) - } - if err != nil { - streamer.Close() - return streamer.Wrote(), err + ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote) } - err = streamer.Close() - return streamer.Wrote(), err + return err } func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) { @@ -212,6 +205,7 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) pieces := 1 expectSize := BlockSize + sizeKnown := false if pieceSize < BlockSize { // Unfortunately the handler doesn't tell us how long // the blob is expected to be, so we have to ask @@ -225,15 +219,15 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) } expectSize = int(props.ContentLength) pieces = (expectSize + pieceSize - 1) / pieceSize + sizeKnown = true } if expectSize == 0 { return 0, nil } - // We'll update this actualSize if/when we get the last piece. - actualSize := -1 errors := make(chan error, pieces) + var wrote atomic.Int64 var wg sync.WaitGroup wg.Add(pieces) for p := 0; p < pieces; p++ { @@ -289,31 +283,24 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) rdr.Close() }() n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos)) - if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) { + wrote.Add(n) + if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) { // If we don't know the actual size, // and just tried reading 64 MiB, it's // normal to encounter EOF. } else if err != nil { - if ctx.Err() == nil { - errors <- err - } + errors <- err cancel() return } - if p == pieces-1 { - actualSize = startPos + int(n) - } }(p) } wg.Wait() close(errors) if len(errors) > 0 { - return 0, v.translateError(<-errors) - } - if ctx.Err() != nil { - return 0, ctx.Err() + return int(wrote.Load()), v.translateError(<-errors) } - return actualSize, nil + return int(wrote.Load()), ctx.Err() } // BlockWrite stores a block on the volume. If it already exists, its diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index c629c9dc15..b8acd980a1 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -13,7 +13,6 @@ import ( "encoding/xml" "flag" "fmt" - "io" "io/ioutil" "math/rand" "net" @@ -490,15 +489,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { if err != nil { c.Error(err) } - gotData := bytes.NewBuffer(nil) - gotLen, err := v.BlockRead(context.Background(), hash, gotData) + gotData := &brbuffer{} + err = v.BlockRead(context.Background(), hash, gotData) if err != nil { c.Error(err) } gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes())) - if gotLen != size { - c.Errorf("length mismatch: got %d != %d", gotLen, size) - } + c.Check(gotData.Len(), check.Equals, size) if gotHash != hash { c.Errorf("hash mismatch: got %s != %s", gotHash, hash) } @@ -532,7 +529,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) { wg.Add(1) go func() { defer wg.Done() - _, err := v.BlockRead(context.Background(), TestHash, io.Discard) + err := v.BlockRead(context.Background(), TestHash, brdiscard) if err != nil { c.Error(err) } @@ -570,15 +567,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che allDone := make(chan struct{}) go func() { defer close(allDone) - buf := bytes.NewBuffer(nil) - n, err := v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { c.Error(err) return } - if n != 0 { - c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n) - } + c.Check(buf.String(), check.Equals, "") }() select { case <-allDone: @@ -596,8 +591,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) { s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error { v.BlockWriteRaw(TestHash, TestBlock) - _, err := v.BlockRead(ctx, TestHash, io.Discard) - return err + return v.BlockRead(ctx, TestHash, brdiscard) }) } @@ -667,7 +661,7 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"Errors":0,.*`) loc := "acbd18db4cc2f85cedef654fccc4a4d8" - _, err := volume.BlockRead(context.Background(), loc, io.Discard) + err := volume.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) @@ -679,9 +673,9 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) - _, err = volume.BlockRead(context.Background(), loc, io.Discard) + err = volume.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.IsNil) - _, err = volume.BlockRead(context.Background(), loc, io.Discard) + err = volume.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) } diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 62b6d15e56..c9a8023059 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -243,13 +243,58 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption } else { out = io.MultiWriter(out, hashcheck) } + + buf, err := ks.bufferPool.GetContext(ctx) + if err != nil { + return 0, err + } + defer ks.bufferPool.Put(buf) + streamer := newStreamWriterAt(out, 65536, buf) + defer streamer.Close() + var errToCaller error = os.ErrNotExist for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) { if ctx.Err() != nil { return 0, ctx.Err() } - n, err = mnt.BlockRead(ctx, li.hash, out) - if err == nil && li.size > 0 && n != li.size { + err := mnt.BlockRead(ctx, li.hash, streamer) + if err != nil { + if streamer.WroteAt() != 0 { + // BlockRead encountered an error + // after writing some data, so it's + // too late to try another + // volume. Flush streamer before + // calling Wrote() to ensure our + // return value accurately reflects + // the number of bytes written to + // opts.WriteTo. + streamer.Close() + return streamer.Wrote(), err + } + if !os.IsNotExist(err) { + errToCaller = err + } + continue + } + if li.size == 0 { + // hashCheckingWriter isn't in use because we + // don't know the expected size. All we can do + // is check after writing all the data, and + // trust the caller is doing a HEAD request so + // it's not too late to set an error code in + // the response header. + err = streamer.Close() + if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil { + err = errChecksum + } + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok { + // We didn't set the content-length header + // above because we didn't know the block size + // until now. + rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt())) + } + return streamer.WroteAt(), err + } else if streamer.WroteAt() != li.size { // If the backend read fewer bytes than // expected but returns no error, we can // classify this as a checksum error (even @@ -260,42 +305,17 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption // it anyway, but if it's a HEAD request the // caller can still change the response status // code. - return n, errChecksum - } - if err == nil && li.size == 0 { - // hashCheckingWriter isn't in use because we - // don't know the expected size. All we can do - // is check after writing all the data, and - // trust the caller is doing a HEAD request so - // it's not too late to set an error code in - // the response header. - if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash { - return n, errChecksum - } - } - if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil { - // We didn't set the content-length header - // above because we didn't know the block size - // until now. - rw.Header().Set("Content-Length", fmt.Sprintf("%d", n)) - } - if n > 0 || err == nil { - // success, or there's an error but we can't - // retry because we've already sent some data. - return n, err - } - if !os.IsNotExist(err) { - // If some volume returns a transient error, - // return it to the caller instead of "Not - // found" so it can retry. - errToCaller = err + return streamer.WroteAt(), errChecksum } + // Ensure streamer flushes all buffered data without + // errors. + err = streamer.Close() + return streamer.Wrote(), err } return 0, errToCaller } func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) { - ks.logger.Infof("blockReadRemote(%s)", opts.Locator) token := ctxToken(ctx) if token == "" { return 0, errNoTokenProvided @@ -459,7 +479,7 @@ func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOpti continue } cmp := &checkEqual{Expect: opts.Data} - if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil { + if err := mnt.BlockRead(ctx, hash, cmp); err == nil { if !cmp.Equal() { return resp, errCollision } @@ -564,25 +584,28 @@ func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount { return mnts } -// checkEqual reports whether the data written to it (via io.Writer +// checkEqual reports whether the data written to it (via io.WriterAt // interface) is equal to the expected data. // // Expect should not be changed after the first Write. +// +// Results are undefined if WriteAt is called with overlapping ranges. type checkEqual struct { - Expect []byte - equalUntil int + Expect []byte + equal atomic.Int64 + notequal atomic.Bool } func (ce *checkEqual) Equal() bool { - return ce.equalUntil == len(ce.Expect) + return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect)) } -func (ce *checkEqual) Write(p []byte) (int, error) { - endpos := ce.equalUntil + len(p) - if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) { - ce.equalUntil = endpos +func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) { + endpos := int(offset) + len(p) + if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) { + ce.equal.Add(int64(len(p))) } else { - ce.equalUntil = -1 + ce.notequal.Store(true) } return len(p), nil } diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go index 3a01476096..28049506f6 100644 --- a/services/keepstore/keepstore_test.go +++ b/services/keepstore/keepstore_test.go @@ -372,7 +372,7 @@ func (s *keepstoreSuite) TestBlockTrash(c *C) { return ks.BlockTrash(ctx, loc) } checkexists := func(volidx int) bool { - _, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard) + err := vol[volidx].BlockRead(ctx, fooHash, brdiscard) if !os.IsNotExist(err) { c.Check(err, IsNil) } @@ -573,7 +573,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) { for _, mnt := range ks.mounts { err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo")) c.Assert(err, IsNil) - _, err = mnt.BlockRead(context.Background(), fooHash, io.Discard) + err = mnt.BlockRead(context.Background(), fooHash, brdiscard) c.Assert(err, IsNil) } @@ -581,7 +581,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) { c.Check(os.IsNotExist(err), Equals, true) for _, mnt := range ks.mounts { - _, err := mnt.BlockRead(context.Background(), fooHash, io.Discard) + err := mnt.BlockRead(context.Background(), fooHash, brdiscard) c.Assert(err, IsNil) } } @@ -693,7 +693,7 @@ type stubVolume struct { // corresponding func (if non-nil). If the func returns an // error, that error is returned to caller. Otherwise, the // stub continues normally. - blockRead func(ctx context.Context, hash string, writeTo io.Writer) (int, error) + blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error blockWrite func(ctx context.Context, hash string, data []byte) error deviceID func() string blockTouch func(hash string) error @@ -710,19 +710,19 @@ func (v *stubVolume) log(op, hash string) { v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3]) } -func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { +func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error { v.log("read", hash) if v.blockRead != nil { - n, err := v.blockRead(ctx, hash, writeTo) + err := v.blockRead(ctx, hash, writeTo) if err != nil { - return n, err + return err } } v.mtx.Lock() ent, ok := v.data[hash] v.mtx.Unlock() if !ok || !ent.trash.IsZero() { - return 0, os.ErrNotExist + return os.ErrNotExist } wrote := 0 for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 { @@ -730,13 +730,13 @@ func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writ if len(data) > writesize { data = data[:writesize] } - n, err := writeTo.Write(data) + n, err := writeTo.WriteAt(data, int64(wrote)) wrote += n if err != nil { - return wrote, err + return err } } - return wrote, nil + return nil } func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { diff --git a/services/keepstore/router_test.go b/services/keepstore/router_test.go index f4bcdd4ae4..ee7be4768c 100644 --- a/services/keepstore/router_test.go +++ b/services/keepstore/router_test.go @@ -268,7 +268,7 @@ func (s *routerSuite) TestBlockTrash(c *C) { resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil) c.Check(resp.Code, Equals, http.StatusOK) c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`) - _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard) + err = vol0.BlockRead(context.Background(), fooHash, brdiscard) c.Assert(err, Equals, os.ErrNotExist) } @@ -281,12 +281,12 @@ func (s *routerSuite) TestBlockUntrash(c *C) { c.Assert(err, IsNil) err = vol0.BlockTrash(fooHash) c.Assert(err, IsNil) - _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard) + err = vol0.BlockRead(context.Background(), fooHash, brdiscard) c.Assert(err, Equals, os.ErrNotExist) resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil) c.Check(resp.Code, Equals, http.StatusOK) c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`) - _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard) + err = vol0.BlockRead(context.Background(), fooHash, brdiscard) c.Check(err, IsNil) } @@ -356,8 +356,8 @@ func (s *routerSuite) TestRequireAdminMgtToken(c *C) { func (s *routerSuite) TestVolumeErrorStatusCode(c *C) { router, cancel := testRouter(c, s.cluster, nil) defer cancel() - router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.Writer) (int, error) { - return 0, httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway) + router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error { + return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway) } // To test whether we fall back to volume 1 after volume 0 @@ -472,10 +472,10 @@ func (s *routerSuite) TestCancelOnDisconnect(c *C) { defer cancel() unblock := make(chan struct{}) - router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.Writer) (int, error) { + router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error { <-unblock c.Check(ctx.Err(), NotNil) - return 0, ctx.Err() + return ctx.Err() } go func() { time.Sleep(time.Second / 10) diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index bd79d49e16..d4b90540ea 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -411,24 +411,13 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) { // BlockRead reads a Keep block that has been stored as a block blob // in the S3 bucket. -func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { +func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { key := v.key(hash) - buf, err := v.bufferPool.GetContext(ctx) - if err != nil { - return 0, err - } - defer v.bufferPool.Put(buf) - - streamer := newStreamWriterAt(writeTo, 65536, buf) - defer streamer.Close() - err = v.readWorker(ctx, key, streamer) + err := v.readWorker(ctx, key, w) if err != nil { err = v.translateError(err) if !os.IsNotExist(err) { - return 0, err - } - if streamer.WroteAt() > 0 { - return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer") + return err } _, err = v.head("recent/" + key) @@ -436,25 +425,21 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer if err != nil { // If we can't read recent/X, there's no point in // trying fixRace. Give up. - return 0, err + return err } if !v.fixRace(key) { err = os.ErrNotExist - return 0, err + return err } - err = v.readWorker(ctx, key, streamer) + err = v.readWorker(ctx, key, w) if err != nil { v.logger.Warnf("reading %s after successful fixRace: %s", hash, err) err = v.translateError(err) - return 0, err + return err } } - err = streamer.Close() - if err != nil { - return 0, v.translateError(err) - } - return streamer.Wrote(), nil + return nil } func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error { diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go index d814949f44..fb68e1c057 100644 --- a/services/keepstore/s3_volume_test.go +++ b/services/keepstore/s3_volume_test.go @@ -221,7 +221,7 @@ func (s *stubbedS3Suite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"Ops":0,.*`) loc := "acbd18db4cc2f85cedef654fccc4a4d8" - _, err := v.BlockRead(context.Background(), loc, io.Discard) + err := v.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`) @@ -232,9 +232,9 @@ func (s *stubbedS3Suite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) c.Check(stats(), check.Matches, `.*"PutOps":2,.*`) - _, err = v.BlockRead(context.Background(), loc, io.Discard) + err = v.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.IsNil) - _, err = v.BlockRead(context.Background(), loc, io.Discard) + err = v.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) } @@ -261,8 +261,7 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) { s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error { - _, err := v.BlockRead(ctx, fooHash, io.Discard) - return err + return v.BlockRead(ctx, fooHash, brdiscard) }) } @@ -480,7 +479,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) { // Check canGet loc, blk := setupScenario() - _, err := v.BlockRead(context.Background(), loc, io.Discard) + err := v.BlockRead(context.Background(), loc, brdiscard) c.Check(err == nil, check.Equals, scenario.canGet) if err != nil { c.Check(os.IsNotExist(err), check.Equals, true) @@ -490,7 +489,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) { loc, _ = setupScenario() err = v.BlockTrash(loc) c.Check(err == nil, check.Equals, scenario.canTrash) - _, err = v.BlockRead(context.Background(), loc, io.Discard) + err = v.BlockRead(context.Background(), loc, brdiscard) c.Check(err == nil, check.Equals, scenario.canGetAfterTrash) if err != nil { c.Check(os.IsNotExist(err), check.Equals, true) @@ -505,7 +504,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) { // should be able to Get after Untrash -- // regardless of timestamps, errors, race // conditions, etc. - _, err = v.BlockRead(context.Background(), loc, io.Discard) + err = v.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.IsNil) } diff --git a/services/keepstore/streamwriterat.go b/services/keepstore/streamwriterat.go index 3426dadc1f..02dce6e216 100644 --- a/services/keepstore/streamwriterat.go +++ b/services/keepstore/streamwriterat.go @@ -19,10 +19,10 @@ import ( // streamWriterAt writes the data to the provided io.Writer in // sequential order. // -// streamWriterAt can also be used as an asynchronous buffer: the -// caller can use the io.Writer interface to write into a memory -// buffer and return without waiting for the wrapped writer to catch -// up. +// streamWriterAt can also be wrapped with an io.OffsetWriter to +// provide an asynchronous buffer: the caller can use the io.Writer +// interface to write into a memory buffer and return without waiting +// for the wrapped writer to catch up. // // Close returns when all data has been written through. type streamWriterAt struct { @@ -87,14 +87,7 @@ func (swa *streamWriterAt) writeToWriter() { } } -// Write implements io.Writer. -func (swa *streamWriterAt) Write(p []byte) (int, error) { - n, err := swa.WriteAt(p, int64(swa.writepos)) - swa.writepos += n - return n, err -} - -// WriteAt implements io.WriterAt. +// WriteAt implements io.WriterAt. WriteAt is goroutine-safe. func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) { pos := int(offset) n := 0 diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index f652a50023..92cf12ac18 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -198,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) { return fi.ModTime(), nil } -// Lock the locker (if one is in use), open the file for reading, and -// call the given function if and when the file is ready to read. -func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { - if err := v.lock(ctx); err != nil { - return err - } - defer v.unlock() - f, err := v.os.Open(path) - if err != nil { - return err - } - defer f.Close() - return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) -} - // stat is os.Stat() with some extra sanity checks. func (v *unixVolume) stat(path string) (os.FileInfo, error) { stat, err := v.os.Stat(path) @@ -227,41 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) { } // BlockRead reads a block from the volume. -func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { +func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { - return 0, v.translateError(err) + return v.translateError(err) } - var streamer *streamWriterAt - if v.locker != nil { - buf, err := v.bufferPool.GetContext(ctx) - if err != nil { - return 0, err - } - defer v.bufferPool.Put(buf) - streamer = newStreamWriterAt(w, 65536, buf) - defer streamer.Close() - w = streamer - } - var n int64 - err = v.getFunc(ctx, path, func(rdr io.Reader) error { - n, err = io.Copy(w, rdr) - if err == nil && n != stat.Size() { - err = io.ErrUnexpectedEOF - } + if err := v.lock(ctx); err != nil { + return err + } + defer v.unlock() + f, err := v.os.Open(path) + if err != nil { return err - }) - if streamer != nil { - // If we're using the streamer (and there's no error - // so far) flush any remaining buffered data now that - // getFunc has released the serialize lock. - if err == nil { - err = streamer.Close() - } - return streamer.WroteAt(), err } - return int(n), err + defer f.Close() + src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes) + dst := io.NewOffsetWriter(w, 0) + n, err := io.Copy(dst, src) + if err == nil && n != stat.Size() { + err = io.ErrUnexpectedEOF + } + return err } // BlockWrite stores a block on the volume. If it already exists, its diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go index 715e23a9ea..bcdb5f6358 100644 --- a/services/keepstore/unix_volume_test.go +++ b/services/keepstore/unix_volume_test.go @@ -8,9 +8,7 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" - "io" "io/ioutil" "os" "sync" @@ -123,16 +121,9 @@ func (s *unixVolumeSuite) TestGetNotFound(c *check.C) { defer v.Teardown() v.BlockWrite(context.Background(), TestHash, TestBlock) - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash2, buf) - switch { - case os.IsNotExist(err): - break - case err == nil: - c.Errorf("Read should have failed, returned %+q", buf.Bytes()) - default: - c.Errorf("Read expected ErrNotExist, got: %s", err) - } + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash2, buf) + c.Check(err, check.FitsTypeOf, os.ErrNotExist) } func (s *unixVolumeSuite) TestPut(c *check.C) { @@ -182,94 +173,6 @@ func (s *unixVolumeSuite) TestIsFull(c *check.C) { } } -func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) { - v := s.newTestableUnixVolume(c, s.params, false) - defer v.Teardown() - - v.BlockWrite(context.Background(), TestHash, TestBlock) - mockErr := errors.New("Mock error") - err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { - return mockErr - }) - if err != mockErr { - c.Errorf("Got %v, expected %v", err, mockErr) - } -} - -func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) { - v := s.newTestableUnixVolume(c, s.params, false) - defer v.Teardown() - - funcCalled := false - err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { - funcCalled = true - return nil - }) - if err == nil { - c.Errorf("Expected error opening non-existent file") - } - if funcCalled { - c.Errorf("Worker func should not have been called") - } -} - -func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) { - v := s.newTestableUnixVolume(c, s.params, false) - defer v.Teardown() - - v.BlockWrite(context.Background(), TestHash, TestBlock) - - mtx := NewMockMutex() - v.locker = mtx - - funcCalled := make(chan struct{}) - go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { - funcCalled <- struct{}{} - return nil - }) - select { - case mtx.AllowLock <- struct{}{}: - case <-funcCalled: - c.Fatal("Function was called before mutex was acquired") - case <-time.After(5 * time.Second): - c.Fatal("Timed out before mutex was acquired") - } - select { - case <-funcCalled: - case mtx.AllowUnlock <- struct{}{}: - c.Fatal("Mutex was released before function was called") - case <-time.After(5 * time.Second): - c.Fatal("Timed out waiting for funcCalled") - } - select { - case mtx.AllowUnlock <- struct{}{}: - case <-time.After(5 * time.Second): - c.Fatal("Timed out waiting for getFunc() to release mutex") - } -} - -type MockMutex struct { - AllowLock chan struct{} - AllowUnlock chan struct{} -} - -func NewMockMutex() *MockMutex { - return &MockMutex{ - AllowLock: make(chan struct{}), - AllowUnlock: make(chan struct{}), - } -} - -// Lock waits for someone to send to AllowLock. -func (m *MockMutex) Lock() { - <-m.AllowLock -} - -// Unlock waits for someone to send to AllowUnlock. -func (m *MockMutex) Unlock() { - <-m.AllowUnlock -} - func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) { v := s.newTestableUnixVolume(c, s.params, true) defer v.Teardown() @@ -300,9 +203,10 @@ func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) { time.Sleep(50 * time.Millisecond) cancel() }() - n, err := v.BlockRead(ctx, TestHash, io.Discard) - if n > 0 || err != context.Canceled { - c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err) + buf := &brbuffer{} + err = v.BlockRead(ctx, TestHash, buf) + if buf.Len() != 0 || err != context.Canceled { + c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err) } } @@ -317,7 +221,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once c.Check(stats(), check.Matches, `.*"Errors":0,.*`) - _, err := vol.BlockRead(context.Background(), fooHash, io.Discard) + err := vol.BlockRead(context.Background(), fooHash, brdiscard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`) c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) @@ -339,8 +243,8 @@ func (s *unixVolumeSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`) c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`) - buf := bytes.NewBuffer(nil) - _, err = vol.BlockRead(context.Background(), fooHash, buf) + buf := &brbuffer{} + err = vol.BlockRead(context.Background(), fooHash, buf) c.Check(err, check.IsNil) c.Check(buf.String(), check.Equals, "foo") c.Check(stats(), check.Matches, `.*"InBytes":3,.*`) diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index a0b6fda7d3..f1b6781da6 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -15,7 +15,7 @@ import ( // volume is the interface to a back-end storage device. type volume interface { - BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) + BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error BlockWrite(ctx context.Context, hash string, data []byte) error DeviceID() string BlockTouch(hash string) error diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go index 22667743dd..16084058b7 100644 --- a/services/keepstore/volume_generic_test.go +++ b/services/keepstore/volume_generic_test.go @@ -9,7 +9,6 @@ import ( "context" "crypto/md5" "fmt" - "io" "os" "regexp" "sort" @@ -127,8 +126,8 @@ func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) { t.Error(err) } - buf := bytes.NewBuffer(nil) - _, err = v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err = v.BlockRead(context.Background(), TestHash, buf) if err != nil { t.Error(err) } @@ -144,7 +143,7 @@ func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFact v := s.newVolume(t, factory) defer v.Teardown() - if _, err := v.BlockRead(context.Background(), barHash, io.Discard); err == nil { + if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil { t.Errorf("Expected error while getting non-existing block %v", barHash) } } @@ -177,8 +176,8 @@ func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory Test v.BlockWrite(context.Background(), testHash, testDataA) putErr := v.BlockWrite(context.Background(), testHash, testDataB) - buf := bytes.NewBuffer(nil) - _, getErr := v.BlockRead(context.Background(), testHash, buf) + buf := &brbuffer{} + getErr := v.BlockRead(context.Background(), testHash, buf) if putErr == nil { // Put must not return a nil error unless it has // overwritten the existing data. @@ -217,8 +216,8 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err) } - buf := bytes.NewBuffer(nil) - _, err = v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err = v.BlockRead(context.Background(), TestHash, buf) if err != nil { t.Error(err) } else { @@ -228,7 +227,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF } buf.Reset() - _, err = v.BlockRead(context.Background(), TestHash2, buf) + err = v.BlockRead(context.Background(), TestHash2, buf) if err != nil { t.Error(err) } else { @@ -238,7 +237,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF } buf.Reset() - _, err = v.BlockRead(context.Background(), TestHash3, buf) + err = v.BlockRead(context.Background(), TestHash3, buf) if err != nil { t.Error(err) } else { @@ -404,8 +403,8 @@ func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFact if err := v.BlockTrash(TestHash); err != nil { t.Error(err) } - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { t.Error(err) } else if buf.String() != string(TestBlock) { @@ -428,7 +427,7 @@ func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFact if err := v.BlockTrash(TestHash); err != nil { t.Error(err) } - if _, err := v.BlockRead(context.Background(), TestHash, io.Discard); err == nil || !os.IsNotExist(err) { + if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) { t.Errorf("os.IsNotExist(%v) should have been true", err) } @@ -517,7 +516,7 @@ func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVo v.BlockWrite(context.Background(), TestHash, TestBlock) } - _, err = v.BlockRead(context.Background(), TestHash, io.Discard) + err = v.BlockRead(context.Background(), TestHash, brdiscard) if err != nil { t.Error(err) } @@ -546,8 +545,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto sem := make(chan int) go func() { - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { t.Errorf("err1: %v", err) } @@ -558,8 +557,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto }() go func() { - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash2, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash2, buf) if err != nil { t.Errorf("err2: %v", err) } @@ -570,8 +569,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto }() go func() { - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash3, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash3, buf) if err != nil { t.Errorf("err3: %v", err) } @@ -619,8 +618,8 @@ func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFacto // Check that we actually wrote the blocks. for _, blk := range blks { - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), blk.hash, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), blk.hash, buf) if err != nil { t.Errorf("get %s: %v", blk.hash, err) } else if buf.String() != string(blk.data) { @@ -644,13 +643,13 @@ func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactor t.Error(err) } - buf := bytes.NewBuffer(nil) - _, err = v.BlockRead(context.Background(), hash, buf) + buf := &brbuffer{} + err = v.BlockRead(context.Background(), hash, buf) if err != nil { t.Error(err) } if buf.String() != string(wdata) { - t.Error("buf %+q != wdata %+q", buf, wdata) + t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata)) } } @@ -668,8 +667,8 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa v.BlockWrite(context.Background(), TestHash, TestBlock) v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration())) - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { t.Error(err) } @@ -684,7 +683,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa return } buf.Reset() - _, err = v.BlockRead(context.Background(), TestHash, buf) + err = v.BlockRead(context.Background(), TestHash, buf) if err == nil || !os.IsNotExist(err) { t.Errorf("os.IsNotExist(%v) should have been true", err) } @@ -697,7 +696,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa // Get the block - after trash and untrash sequence buf.Reset() - _, err = v.BlockRead(context.Background(), TestHash, buf) + err = v.BlockRead(context.Background(), TestHash, buf) if err != nil { t.Error(err) } @@ -712,8 +711,8 @@ func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVo defer v.Teardown() checkGet := func() error { - buf := bytes.NewBuffer(nil) - _, err := v.BlockRead(context.Background(), TestHash, buf) + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { return err } diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index 5a17b3a7dc..f64041b048 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -5,6 +5,7 @@ package keepstore import ( + "sync" "time" ) @@ -38,3 +39,52 @@ type TestableVolume interface { // Clean up, delete temporary files. Teardown() } + +// brbuffer is like bytes.Buffer, but it implements io.WriterAt. +// Convenient for testing (volume)BlockRead implementations. +type brbuffer struct { + mtx sync.Mutex + buf []byte +} + +func (b *brbuffer) WriteAt(p []byte, offset int64) (int, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + if short := int(offset) + len(p) - len(b.buf); short > 0 { + b.buf = append(b.buf, make([]byte, short)...) + } + return copy(b.buf[offset:], p), nil +} + +func (b *brbuffer) Bytes() []byte { + b.mtx.Lock() + defer b.mtx.Unlock() + return b.buf +} + +func (b *brbuffer) String() string { + b.mtx.Lock() + defer b.mtx.Unlock() + return string(b.buf) +} + +func (b *brbuffer) Len() int { + b.mtx.Lock() + defer b.mtx.Unlock() + return len(b.buf) +} + +func (b *brbuffer) Reset() { + b.mtx.Lock() + defer b.mtx.Unlock() + b.buf = nil +} + +// a brdiscarder is like io.Discard, but it implements +// io.WriterAt. Convenient for testing (volume)BlockRead +// implementations when the output is not checked. +type brdiscarder struct{} + +func (brdiscarder) WriteAt(p []byte, offset int64) (int, error) { return len(p), nil } + +var brdiscard = brdiscarder{} -- 2.30.2