"errors"
"fmt"
"io"
+ "math/rand"
+ "os"
+ "path/filepath"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
}
type keepGatewayMemoryBacked struct {
- mtx sync.RWMutex
- data map[string][]byte
+ mtx sync.RWMutex
+ data map[string][]byte
+ pauseBlockReadAfter int
+ pauseBlockReadUntil chan error
}
func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
if data == nil {
return 0, errors.New("block not found: " + opts.Locator)
}
+ if k.pauseBlockReadUntil != nil {
+ src := bytes.NewReader(data)
+ n, err := io.CopyN(opts.WriteTo, src, int64(k.pauseBlockReadAfter))
+ if err != nil {
+ return int(n), err
+ }
+ <-k.pauseBlockReadUntil
+ n2, err := io.Copy(opts.WriteTo, src)
+ return int(n + n2), err
+ }
return opts.WriteTo.Write(data)
}
func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
}
+func (s *keepCacheSuite) TestBlockWrite(c *check.C) {
+ backend := &keepGatewayMemoryBacked{}
+ cache := DiskCache{
+ KeepGateway: backend,
+ MaxSize: 40000000,
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ ctx := context.Background()
+ real, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, 100000),
+ })
+ c.Assert(err, check.IsNil)
+
+ // Write different data but supply the same hash. Should be
+ // rejected (even though our fake backend doesn't notice).
+ _, err = cache.BlockWrite(ctx, BlockWriteOptions{
+ Hash: real.Locator[:32],
+ Data: make([]byte, 10),
+ })
+ c.Check(err, check.ErrorMatches, `block hash .+ did not match provided hash .+`)
+
+ // Ensure the bogus write didn't overwrite (or delete) the
+ // real cached data associated with that hash.
+ delete(backend.data, real.Locator)
+ n, err := cache.ReadAt(real.Locator, make([]byte, 100), 0)
+ c.Check(n, check.Equals, 100)
+ c.Check(err, check.IsNil)
+}
+
func (s *keepCacheSuite) TestMaxSize(c *check.C) {
backend := &keepGatewayMemoryBacked{}
cache := DiskCache{
Data: make([]byte, 44000000),
})
c.Check(err, check.IsNil)
+
+ // Wait for tidy to finish, check that it doesn't delete the
+ // only block.
time.Sleep(time.Millisecond)
+ for atomic.LoadInt32(&cache.tidying) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
+
resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
Data: make([]byte, 32000000),
})
c.Check(err, check.IsNil)
delete(backend.data, resp1.Locator)
delete(backend.data, resp2.Locator)
- cache.tidyHoldUntil = time.Time{}
- cache.tidy()
+
+ // Wait for tidy to finish, check that it deleted the older
+ // block.
+ time.Sleep(time.Millisecond)
+ for atomic.LoadInt32(&cache.tidying) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
c.Check(n, check.Equals, 0)
c.Check(err, check.IsNil)
}
-func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
+ s.testConcurrentReaders(c, true, false)
+}
+func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
+ s.testConcurrentReaders(c, false, true)
+}
+func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
blksize := 64000000
backend := &keepGatewayMemoryBacked{}
cache := DiskCache{
KeepGateway: backend,
- MaxSize: int64(blksize),
+ MaxSize: ByteSizeOrPercent(blksize),
Dir: c.MkDir(),
Logger: ctxlog.TestLogger(c),
}
- ctx := context.Background()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
Data: make([]byte, blksize),
})
c.Check(err, check.IsNil)
- delete(backend.data, resp.Locator)
+ if cannotRefresh {
+ // Delete the block from the backing store, to ensure
+ // the cache doesn't rely on re-reading a block that
+ // it has just written.
+ delete(backend.data, resp.Locator)
+ }
+ if mangleCache {
+ // Replace cache files with truncated files (and
+ // delete them outright) while the ReadAt loop is
+ // running, to ensure the cache can re-fetch from the
+ // backend as needed.
+ var nRemove, nTrunc int
+ defer func() {
+ c.Logf("nRemove %d", nRemove)
+ c.Logf("nTrunc %d", nTrunc)
+ }()
+ go func() {
+ // Truncate/delete the cache file at various
+ // intervals. Readers should re-fetch/recover from
+ // this.
+ fnm := cache.cacheFile(resp.Locator)
+ for ctx.Err() == nil {
+ trunclen := rand.Int63() % int64(blksize*2)
+ if trunclen > int64(blksize) {
+ err := os.Remove(fnm)
+ if err == nil {
+ nRemove++
+ }
+ } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
+ err := os.Rename(fnm+"#", fnm)
+ if err == nil {
+ nTrunc++
+ }
+ }
+ }
+ }()
+ }
failed := false
var wg sync.WaitGroup
- for offset := 0; offset < blksize; offset += 123456 {
- offset := offset
+ var slots = make(chan bool, 100) // limit concurrency / memory usage
+ for i := 0; i < 20000; i++ {
+ offset := (i * 123456) % blksize
+ slots <- true
wg.Add(1)
go func() {
defer wg.Done()
+ defer func() { <-slots }()
buf := make([]byte, 654321)
if offset+len(buf) > blksize {
buf = buf[:blksize-offset]
}
wg.Wait()
}
+
+func (s *keepCacheSuite) TestStreaming(c *check.C) {
+ blksize := 64000000
+ backend := &keepGatewayMemoryBacked{
+ pauseBlockReadUntil: make(chan error),
+ pauseBlockReadAfter: blksize / 8,
+ }
+ cache := DiskCache{
+ KeepGateway: backend,
+ MaxSize: ByteSizeOrPercent(blksize),
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, blksize),
+ })
+ c.Check(err, check.IsNil)
+ os.RemoveAll(filepath.Join(cache.Dir, resp.Locator[:3]))
+
+ // Start a lot of concurrent requests for various ranges of
+ // the same block. Our backend will return the first 8MB and
+ // then pause. The requests that can be satisfied by the first
+ // 8MB of data should return quickly. The rest should wait,
+ // and return after we release pauseBlockReadUntil.
+ var wgEarly, wgLate sync.WaitGroup
+ var doneEarly, doneLate int32
+ for i := 0; i < 10000; i++ {
+ wgEarly.Add(1)
+ go func() {
+ offset := int(rand.Int63() % int64(blksize-benchReadSize))
+ if offset+benchReadSize > backend.pauseBlockReadAfter {
+ wgLate.Add(1)
+ defer wgLate.Done()
+ wgEarly.Done()
+ defer atomic.AddInt32(&doneLate, 1)
+ } else {
+ defer wgEarly.Done()
+ defer atomic.AddInt32(&doneEarly, 1)
+ }
+ buf := make([]byte, benchReadSize)
+ n, err := cache.ReadAt(resp.Locator, buf, offset)
+ c.Check(n, check.Equals, len(buf))
+ c.Check(err, check.IsNil)
+ }()
+ }
+
+ // Ensure all early ranges finish while backend request(s) are
+ // paused.
+ wgEarly.Wait()
+ c.Logf("doneEarly = %d", doneEarly)
+ c.Check(doneLate, check.Equals, int32(0))
+
+ // Unpause backend request(s).
+ close(backend.pauseBlockReadUntil)
+ wgLate.Wait()
+ c.Logf("doneLate = %d", doneLate)
+}
+
+var _ = check.Suite(&keepCacheBenchSuite{})
+
+type keepCacheBenchSuite struct {
+ blksize int
+ blkcount int
+ backend *keepGatewayMemoryBacked
+ cache *DiskCache
+ locators []string
+}
+
+func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
+ s.blksize = 64000000
+ s.blkcount = 8
+ s.backend = &keepGatewayMemoryBacked{}
+ s.cache = &DiskCache{
+ KeepGateway: s.backend,
+ MaxSize: ByteSizeOrPercent(s.blksize),
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ s.locators = make([]string, s.blkcount)
+ data := make([]byte, s.blksize)
+ for b := 0; b < s.blkcount; b++ {
+ for i := range data {
+ data[i] = byte(b)
+ }
+ resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: data,
+ })
+ c.Assert(err, check.IsNil)
+ s.locators[b] = resp.Locator
+ }
+}
+
+func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
+ var wg sync.WaitGroup
+ for i := 0; i < c.N; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buf := make([]byte, benchReadSize)
+ _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+ if err != nil {
+ c.Fail()
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
+ buf := make([]byte, benchReadSize)
+ for i := 0; i < c.N; i++ {
+ _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+ if err != nil {
+ c.Fail()
+ }
+ }
+}
+
+const benchReadSize = 1000
+
+var _ = check.Suite(&fileOpsSuite{})
+
+type fileOpsSuite struct{}
+
+// BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
+// potential performance improvement of caching filehandles rather
+// than opening/closing the cache file for each read.
+//
+// Results from a development machine indicate a ~3x throughput
+// improvement: ~636 MB/s when opening/closing the file for each
+// 1000-byte read vs. ~2 GB/s when opening the file once and doing
+// concurrent reads using the same file descriptor.
+func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
+ fnm := c.MkDir() + "/testfile"
+ os.WriteFile(fnm, make([]byte, 64000000), 0700)
+ var wg sync.WaitGroup
+ for i := 0; i < c.N; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ f.Close()
+ }()
+ }
+ wg.Wait()
+}
+
+func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
+ fnm := c.MkDir() + "/testfile"
+ os.WriteFile(fnm, make([]byte, 64000000), 0700)
+ f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ var wg sync.WaitGroup
+ for i := 0; i < c.N; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ }()
+ }
+ wg.Wait()
+ f.Close()
+}