// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a BlockWrite operation is in progress,
// and wait for it to finish writing.
-func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
trashed, _, err := v.checkTrashed(hash)
if err != nil {
- return 0, err
+ return err
}
if trashed {
- return 0, os.ErrNotExist
+ return os.ErrNotExist
}
buf, err := v.bufferPool.GetContext(ctx)
if err != nil {
- return 0, err
+ return err
}
defer v.bufferPool.Put(buf)
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
var deadline time.Time
- size, err := v.get(ctx, hash, streamer)
- for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
+ wrote, err := v.get(ctx, hash, w)
+ for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
// (apparently) does "CreateEmpty" and "CommitData"
}
select {
case <-ctx.Done():
- return 0, ctx.Err()
+ return ctx.Err()
case <-time.After(v.WriteRacePollTime.Duration()):
}
- size, err = v.get(ctx, hash, streamer)
+ wrote, err = v.get(ctx, hash, w)
}
if !deadline.IsZero() {
- ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
- }
- if err != nil {
- streamer.Close()
- return streamer.Wrote(), err
+ ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
}
- err = streamer.Close()
- return streamer.Wrote(), err
+ return err
}
func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
pieces := 1
expectSize := BlockSize
+ sizeKnown := false
if pieceSize < BlockSize {
// Unfortunately the handler doesn't tell us how long
// the blob is expected to be, so we have to ask
}
expectSize = int(props.ContentLength)
pieces = (expectSize + pieceSize - 1) / pieceSize
+ sizeKnown = true
}
if expectSize == 0 {
return 0, nil
}
- // We'll update this actualSize if/when we get the last piece.
- actualSize := -1
errors := make(chan error, pieces)
+ var wrote atomic.Int64
var wg sync.WaitGroup
wg.Add(pieces)
for p := 0; p < pieces; p++ {
rdr.Close()
}()
n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
- if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+ wrote.Add(n)
+ if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
// If we don't know the actual size,
// and just tried reading 64 MiB, it's
// normal to encounter EOF.
} else if err != nil {
- if ctx.Err() == nil {
- errors <- err
- }
+ errors <- err
cancel()
return
}
- if p == pieces-1 {
- actualSize = startPos + int(n)
- }
}(p)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
- return 0, v.translateError(<-errors)
- }
- if ctx.Err() != nil {
- return 0, ctx.Err()
+ return int(wrote.Load()), v.translateError(<-errors)
}
- return actualSize, nil
+ return int(wrote.Load()), ctx.Err()
}
// BlockWrite stores a block on the volume. If it already exists, its
"encoding/xml"
"flag"
"fmt"
- "io"
"io/ioutil"
"math/rand"
"net"
if err != nil {
c.Error(err)
}
- gotData := bytes.NewBuffer(nil)
- gotLen, err := v.BlockRead(context.Background(), hash, gotData)
+ gotData := &brbuffer{}
+ err = v.BlockRead(context.Background(), hash, gotData)
if err != nil {
c.Error(err)
}
gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
- if gotLen != size {
- c.Errorf("length mismatch: got %d != %d", gotLen, size)
- }
+ c.Check(gotData.Len(), check.Equals, size)
if gotHash != hash {
c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
}
wg.Add(1)
go func() {
defer wg.Done()
- _, err := v.BlockRead(context.Background(), TestHash, io.Discard)
+ err := v.BlockRead(context.Background(), TestHash, brdiscard)
if err != nil {
c.Error(err)
}
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf := bytes.NewBuffer(nil)
- n, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
c.Error(err)
return
}
- if n != 0 {
- c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
- }
+ c.Check(buf.String(), check.Equals, "")
}()
select {
case <-allDone:
func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
v.BlockWriteRaw(TestHash, TestBlock)
- _, err := v.BlockRead(ctx, TestHash, io.Discard)
- return err
+ return v.BlockRead(ctx, TestHash, brdiscard)
})
}
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := volume.BlockRead(context.Background(), loc, io.Discard)
+ err := volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
- _, err = volume.BlockRead(context.Background(), loc, io.Discard)
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = volume.BlockRead(context.Background(), loc, io.Discard)
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
} else {
out = io.MultiWriter(out, hashcheck)
}
+
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer ks.bufferPool.Put(buf)
+ streamer := newStreamWriterAt(out, 65536, buf)
+ defer streamer.Close()
+
var errToCaller error = os.ErrNotExist
for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
if ctx.Err() != nil {
return 0, ctx.Err()
}
- n, err = mnt.BlockRead(ctx, li.hash, out)
- if err == nil && li.size > 0 && n != li.size {
+ err := mnt.BlockRead(ctx, li.hash, streamer)
+ if err != nil {
+ if streamer.WroteAt() != 0 {
+ // BlockRead encountered an error
+ // after writing some data, so it's
+ // too late to try another
+ // volume. Flush streamer before
+ // calling Wrote() to ensure our
+ // return value accurately reflects
+ // the number of bytes written to
+ // opts.WriteTo.
+ streamer.Close()
+ return streamer.Wrote(), err
+ }
+ if !os.IsNotExist(err) {
+ errToCaller = err
+ }
+ continue
+ }
+ if li.size == 0 {
+ // hashCheckingWriter isn't in use because we
+ // don't know the expected size. All we can do
+ // is check after writing all the data, and
+ // trust the caller is doing a HEAD request so
+ // it's not too late to set an error code in
+ // the response header.
+ err = streamer.Close()
+ if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+ err = errChecksum
+ }
+ if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+ // We didn't set the content-length header
+ // above because we didn't know the block size
+ // until now.
+ rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+ }
+ return streamer.WroteAt(), err
+ } else if streamer.WroteAt() != li.size {
// If the backend read fewer bytes than
// expected but returns no error, we can
// classify this as a checksum error (even
// it anyway, but if it's a HEAD request the
// caller can still change the response status
// code.
- return n, errChecksum
- }
- if err == nil && li.size == 0 {
- // hashCheckingWriter isn't in use because we
- // don't know the expected size. All we can do
- // is check after writing all the data, and
- // trust the caller is doing a HEAD request so
- // it's not too late to set an error code in
- // the response header.
- if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
- return n, errChecksum
- }
- }
- if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
- // We didn't set the content-length header
- // above because we didn't know the block size
- // until now.
- rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
- }
- if n > 0 || err == nil {
- // success, or there's an error but we can't
- // retry because we've already sent some data.
- return n, err
- }
- if !os.IsNotExist(err) {
- // If some volume returns a transient error,
- // return it to the caller instead of "Not
- // found" so it can retry.
- errToCaller = err
+ return streamer.WroteAt(), errChecksum
}
+ // Ensure streamer flushes all buffered data without
+ // errors.
+ err = streamer.Close()
+ return streamer.Wrote(), err
}
return 0, errToCaller
}
func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
- ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
token := ctxToken(ctx)
if token == "" {
return 0, errNoTokenProvided
continue
}
cmp := &checkEqual{Expect: opts.Data}
- if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+ if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
if !cmp.Equal() {
return resp, errCollision
}
return mnts
}
-// checkEqual reports whether the data written to it (via io.Writer
+// checkEqual reports whether the data written to it (via io.WriterAt
// interface) is equal to the expected data.
//
// Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
type checkEqual struct {
- Expect []byte
- equalUntil int
+ Expect []byte
+ equal atomic.Int64
+ notequal atomic.Bool
}
func (ce *checkEqual) Equal() bool {
- return ce.equalUntil == len(ce.Expect)
+ return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
}
-func (ce *checkEqual) Write(p []byte) (int, error) {
- endpos := ce.equalUntil + len(p)
- if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
- ce.equalUntil = endpos
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+ endpos := int(offset) + len(p)
+ if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+ ce.equal.Add(int64(len(p)))
} else {
- ce.equalUntil = -1
+ ce.notequal.Store(true)
}
return len(p), nil
}
return ks.BlockTrash(ctx, loc)
}
checkexists := func(volidx int) bool {
- _, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard)
+ err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
if !os.IsNotExist(err) {
c.Check(err, IsNil)
}
for _, mnt := range ks.mounts {
err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
c.Assert(err, IsNil)
- _, err = mnt.BlockRead(context.Background(), fooHash, io.Discard)
+ err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, IsNil)
}
c.Check(os.IsNotExist(err), Equals, true)
for _, mnt := range ks.mounts {
- _, err := mnt.BlockRead(context.Background(), fooHash, io.Discard)
+ err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, IsNil)
}
}
// corresponding func (if non-nil). If the func returns an
// error, that error is returned to caller. Otherwise, the
// stub continues normally.
- blockRead func(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+ blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
blockWrite func(ctx context.Context, hash string, data []byte) error
deviceID func() string
blockTouch func(hash string) error
v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
}
-func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
v.log("read", hash)
if v.blockRead != nil {
- n, err := v.blockRead(ctx, hash, writeTo)
+ err := v.blockRead(ctx, hash, writeTo)
if err != nil {
- return n, err
+ return err
}
}
v.mtx.Lock()
ent, ok := v.data[hash]
v.mtx.Unlock()
if !ok || !ent.trash.IsZero() {
- return 0, os.ErrNotExist
+ return os.ErrNotExist
}
wrote := 0
for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
if len(data) > writesize {
data = data[:writesize]
}
- n, err := writeTo.Write(data)
+ n, err := writeTo.WriteAt(data, int64(wrote))
wrote += n
if err != nil {
- return wrote, err
+ return err
}
}
- return wrote, nil
+ return nil
}
func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
c.Check(resp.Code, Equals, http.StatusOK)
c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
- _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+ err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, Equals, os.ErrNotExist)
}
c.Assert(err, IsNil)
err = vol0.BlockTrash(fooHash)
c.Assert(err, IsNil)
- _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+ err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, Equals, os.ErrNotExist)
resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
c.Check(resp.Code, Equals, http.StatusOK)
c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
- _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+ err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
c.Check(err, IsNil)
}
func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
router, cancel := testRouter(c, s.cluster, nil)
defer cancel()
- router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.Writer) (int, error) {
- return 0, httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
+ router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
+ return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
}
// To test whether we fall back to volume 1 after volume 0
defer cancel()
unblock := make(chan struct{})
- router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.Writer) (int, error) {
+ router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
<-unblock
c.Check(ctx.Err(), NotNil)
- return 0, ctx.Err()
+ return ctx.Err()
}
go func() {
time.Sleep(time.Second / 10)
// BlockRead reads a Keep block that has been stored as a block blob
// in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
key := v.key(hash)
- buf, err := v.bufferPool.GetContext(ctx)
- if err != nil {
- return 0, err
- }
- defer v.bufferPool.Put(buf)
-
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
- err = v.readWorker(ctx, key, streamer)
+ err := v.readWorker(ctx, key, w)
if err != nil {
err = v.translateError(err)
if !os.IsNotExist(err) {
- return 0, err
- }
- if streamer.WroteAt() > 0 {
- return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+ return err
}
_, err = v.head("recent/" + key)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
- return 0, err
+ return err
}
if !v.fixRace(key) {
err = os.ErrNotExist
- return 0, err
+ return err
}
- err = v.readWorker(ctx, key, streamer)
+ err = v.readWorker(ctx, key, w)
if err != nil {
v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
err = v.translateError(err)
- return 0, err
+ return err
}
}
- err = streamer.Close()
- if err != nil {
- return 0, v.translateError(err)
- }
- return streamer.Wrote(), nil
+ return nil
}
func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := v.BlockRead(context.Background(), loc, io.Discard)
+ err := v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
- _, err := v.BlockRead(ctx, fooHash, io.Discard)
- return err
+ return v.BlockRead(ctx, fooHash, brdiscard)
})
}
// Check canGet
loc, blk := setupScenario()
- _, err := v.BlockRead(context.Background(), loc, io.Discard)
+ err := v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
loc, _ = setupScenario()
err = v.BlockTrash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
}
// streamWriterAt writes the data to the provided io.Writer in
// sequential order.
//
-// streamWriterAt can also be used as an asynchronous buffer: the
-// caller can use the io.Writer interface to write into a memory
-// buffer and return without waiting for the wrapped writer to catch
-// up.
+// streamWriterAt can also be wrapped with an io.OffsetWriter to
+// provide an asynchronous buffer: the caller can use the io.Writer
+// interface to write into a memory buffer and return without waiting
+// for the wrapped writer to catch up.
//
// Close returns when all data has been written through.
type streamWriterAt struct {
}
}
-// Write implements io.Writer.
-func (swa *streamWriterAt) Write(p []byte) (int, error) {
- n, err := swa.WriteAt(p, int64(swa.writepos))
- swa.writepos += n
- return n, err
-}
-
-// WriteAt implements io.WriterAt.
+// WriteAt implements io.WriterAt. WriteAt is goroutine-safe.
func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
pos := int(offset)
n := 0
return fi.ModTime(), nil
}
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
- if err := v.lock(ctx); err != nil {
- return err
- }
- defer v.unlock()
- f, err := v.os.Open(path)
- if err != nil {
- return err
- }
- defer f.Close()
- return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
// stat is os.Stat() with some extra sanity checks.
func (v *unixVolume) stat(path string) (os.FileInfo, error) {
stat, err := v.os.Stat(path)
}
// BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
path := v.blockPath(hash)
stat, err := v.stat(path)
if err != nil {
- return 0, v.translateError(err)
+ return v.translateError(err)
}
- var streamer *streamWriterAt
- if v.locker != nil {
- buf, err := v.bufferPool.GetContext(ctx)
- if err != nil {
- return 0, err
- }
- defer v.bufferPool.Put(buf)
- streamer = newStreamWriterAt(w, 65536, buf)
- defer streamer.Close()
- w = streamer
- }
- var n int64
- err = v.getFunc(ctx, path, func(rdr io.Reader) error {
- n, err = io.Copy(w, rdr)
- if err == nil && n != stat.Size() {
- err = io.ErrUnexpectedEOF
- }
+ if err := v.lock(ctx); err != nil {
+ return err
+ }
+ defer v.unlock()
+ f, err := v.os.Open(path)
+ if err != nil {
return err
- })
- if streamer != nil {
- // If we're using the streamer (and there's no error
- // so far) flush any remaining buffered data now that
- // getFunc has released the serialize lock.
- if err == nil {
- err = streamer.Close()
- }
- return streamer.WroteAt(), err
}
- return int(n), err
+ defer f.Close()
+ src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+ dst := io.NewOffsetWriter(w, 0)
+ n, err := io.Copy(dst, src)
+ if err == nil && n != stat.Size() {
+ err = io.ErrUnexpectedEOF
+ }
+ return err
}
// BlockWrite stores a block on the volume. If it already exists, its
"bytes"
"context"
"encoding/json"
- "errors"
"fmt"
- "io"
"io/ioutil"
"os"
"sync"
defer v.Teardown()
v.BlockWrite(context.Background(), TestHash, TestBlock)
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash2, buf)
- switch {
- case os.IsNotExist(err):
- break
- case err == nil:
- c.Errorf("Read should have failed, returned %+q", buf.Bytes())
- default:
- c.Errorf("Read expected ErrNotExist, got: %s", err)
- }
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash2, buf)
+ c.Check(err, check.FitsTypeOf, os.ErrNotExist)
}
func (s *unixVolumeSuite) TestPut(c *check.C) {
}
}
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
- v := s.newTestableUnixVolume(c, s.params, false)
- defer v.Teardown()
-
- v.BlockWrite(context.Background(), TestHash, TestBlock)
- mockErr := errors.New("Mock error")
- err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
- return mockErr
- })
- if err != mockErr {
- c.Errorf("Got %v, expected %v", err, mockErr)
- }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
- v := s.newTestableUnixVolume(c, s.params, false)
- defer v.Teardown()
-
- funcCalled := false
- err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
- funcCalled = true
- return nil
- })
- if err == nil {
- c.Errorf("Expected error opening non-existent file")
- }
- if funcCalled {
- c.Errorf("Worker func should not have been called")
- }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
- v := s.newTestableUnixVolume(c, s.params, false)
- defer v.Teardown()
-
- v.BlockWrite(context.Background(), TestHash, TestBlock)
-
- mtx := NewMockMutex()
- v.locker = mtx
-
- funcCalled := make(chan struct{})
- go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
- funcCalled <- struct{}{}
- return nil
- })
- select {
- case mtx.AllowLock <- struct{}{}:
- case <-funcCalled:
- c.Fatal("Function was called before mutex was acquired")
- case <-time.After(5 * time.Second):
- c.Fatal("Timed out before mutex was acquired")
- }
- select {
- case <-funcCalled:
- case mtx.AllowUnlock <- struct{}{}:
- c.Fatal("Mutex was released before function was called")
- case <-time.After(5 * time.Second):
- c.Fatal("Timed out waiting for funcCalled")
- }
- select {
- case mtx.AllowUnlock <- struct{}{}:
- case <-time.After(5 * time.Second):
- c.Fatal("Timed out waiting for getFunc() to release mutex")
- }
-}
-
-type MockMutex struct {
- AllowLock chan struct{}
- AllowUnlock chan struct{}
-}
-
-func NewMockMutex() *MockMutex {
- return &MockMutex{
- AllowLock: make(chan struct{}),
- AllowUnlock: make(chan struct{}),
- }
-}
-
-// Lock waits for someone to send to AllowLock.
-func (m *MockMutex) Lock() {
- <-m.AllowLock
-}
-
-// Unlock waits for someone to send to AllowUnlock.
-func (m *MockMutex) Unlock() {
- <-m.AllowUnlock
-}
-
func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
v := s.newTestableUnixVolume(c, s.params, true)
defer v.Teardown()
time.Sleep(50 * time.Millisecond)
cancel()
}()
- n, err := v.BlockRead(ctx, TestHash, io.Discard)
- if n > 0 || err != context.Canceled {
- c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
+ buf := &brbuffer{}
+ err = v.BlockRead(ctx, TestHash, buf)
+ if buf.Len() != 0 || err != context.Canceled {
+ c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
}
}
c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
- _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
+ err := vol.BlockRead(context.Background(), fooHash, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
- buf := bytes.NewBuffer(nil)
- _, err = vol.BlockRead(context.Background(), fooHash, buf)
+ buf := &brbuffer{}
+ err = vol.BlockRead(context.Background(), fooHash, buf)
c.Check(err, check.IsNil)
c.Check(buf.String(), check.Equals, "foo")
c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
// volume is the interface to a back-end storage device.
type volume interface {
- BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+ BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error
BlockWrite(ctx context.Context, hash string, data []byte) error
DeviceID() string
BlockTouch(hash string) error
"context"
"crypto/md5"
"fmt"
- "io"
"os"
"regexp"
"sort"
t.Error(err)
}
- buf := bytes.NewBuffer(nil)
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
v := s.newVolume(t, factory)
defer v.Teardown()
- if _, err := v.BlockRead(context.Background(), barHash, io.Discard); err == nil {
+ if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil {
t.Errorf("Expected error while getting non-existing block %v", barHash)
}
}
v.BlockWrite(context.Background(), testHash, testDataA)
putErr := v.BlockWrite(context.Background(), testHash, testDataB)
- buf := bytes.NewBuffer(nil)
- _, getErr := v.BlockRead(context.Background(), testHash, buf)
+ buf := &brbuffer{}
+ getErr := v.BlockRead(context.Background(), testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
- buf := bytes.NewBuffer(nil)
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
} else {
}
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash2, buf)
+ err = v.BlockRead(context.Background(), TestHash2, buf)
if err != nil {
t.Error(err)
} else {
}
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash3, buf)
+ err = v.BlockRead(context.Background(), TestHash3, buf)
if err != nil {
t.Error(err)
} else {
if err := v.BlockTrash(TestHash); err != nil {
t.Error(err)
}
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
} else if buf.String() != string(TestBlock) {
if err := v.BlockTrash(TestHash); err != nil {
t.Error(err)
}
- if _, err := v.BlockRead(context.Background(), TestHash, io.Discard); err == nil || !os.IsNotExist(err) {
+ if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
v.BlockWrite(context.Background(), TestHash, TestBlock)
}
- _, err = v.BlockRead(context.Background(), TestHash, io.Discard)
+ err = v.BlockRead(context.Background(), TestHash, brdiscard)
if err != nil {
t.Error(err)
}
sem := make(chan int)
go func() {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
}()
go func() {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash2, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
}()
go func() {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash3, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
// Check that we actually wrote the blocks.
for _, blk := range blks {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), blk.hash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), blk.hash, buf)
if err != nil {
t.Errorf("get %s: %v", blk.hash, err)
} else if buf.String() != string(blk.data) {
t.Error(err)
}
- buf := bytes.NewBuffer(nil)
- _, err = v.BlockRead(context.Background(), hash, buf)
+ buf := &brbuffer{}
+ err = v.BlockRead(context.Background(), hash, buf)
if err != nil {
t.Error(err)
}
if buf.String() != string(wdata) {
- t.Error("buf %+q != wdata %+q", buf, wdata)
+ t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata))
}
}
v.BlockWrite(context.Background(), TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
return
}
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
// Get the block - after trash and untrash sequence
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
defer v.Teardown()
checkGet := func() error {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
return err
}
package keepstore
import (
+ "sync"
"time"
)
// Clean up, delete temporary files.
Teardown()
}
+
+// brbuffer is like bytes.Buffer, but it implements io.WriterAt.
+// Convenient for testing (volume)BlockRead implementations.
+type brbuffer struct {
+ mtx sync.Mutex
+ buf []byte
+}
+
+func (b *brbuffer) WriteAt(p []byte, offset int64) (int, error) {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ if short := int(offset) + len(p) - len(b.buf); short > 0 {
+ b.buf = append(b.buf, make([]byte, short)...)
+ }
+ return copy(b.buf[offset:], p), nil
+}
+
+func (b *brbuffer) Bytes() []byte {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ return b.buf
+}
+
+func (b *brbuffer) String() string {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ return string(b.buf)
+}
+
+func (b *brbuffer) Len() int {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ return len(b.buf)
+}
+
+func (b *brbuffer) Reset() {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ b.buf = nil
+}
+
+// a brdiscarder is like io.Discard, but it implements
+// io.WriterAt. Convenient for testing (volume)BlockRead
+// implementations when the output is not checked.
+type brdiscarder struct{}
+
+func (brdiscarder) WriteAt(p []byte, offset int64) (int, error) { return len(p), nil }
+
+var brdiscard = brdiscarder{}