Merge branch '21504-arv-mount-reference'
[arvados.git] / sdk / go / arvados / keep_cache_test.go
index 6cc5a2b8dd5c13d293943313d8929e607ebf262b..776d9bb6528f9c5a34a8095f1393efe45db5888c 100644 (file)
@@ -11,7 +11,11 @@ import (
        "errors"
        "fmt"
        "io"
+       "math/rand"
+       "os"
+       "path/filepath"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -47,8 +51,10 @@ func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOpti
 }
 
 type keepGatewayMemoryBacked struct {
-       mtx  sync.RWMutex
-       data map[string][]byte
+       mtx                 sync.RWMutex
+       data                map[string][]byte
+       pauseBlockReadAfter int
+       pauseBlockReadUntil chan error
 }
 
 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
@@ -74,6 +80,16 @@ func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadO
        if data == nil {
                return 0, errors.New("block not found: " + opts.Locator)
        }
+       if k.pauseBlockReadUntil != nil {
+               src := bytes.NewReader(data)
+               n, err := io.CopyN(opts.WriteTo, src, int64(k.pauseBlockReadAfter))
+               if err != nil {
+                       return int(n), err
+               }
+               <-k.pauseBlockReadUntil
+               n2, err := io.Copy(opts.WriteTo, src)
+               return int(n + n2), err
+       }
        return opts.WriteTo.Write(data)
 }
 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
@@ -98,6 +114,36 @@ func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWrit
        return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
 }
 
+func (s *keepCacheSuite) TestBlockWrite(c *check.C) {
+       backend := &keepGatewayMemoryBacked{}
+       cache := DiskCache{
+               KeepGateway: backend,
+               MaxSize:     40000000,
+               Dir:         c.MkDir(),
+               Logger:      ctxlog.TestLogger(c),
+       }
+       ctx := context.Background()
+       real, err := cache.BlockWrite(ctx, BlockWriteOptions{
+               Data: make([]byte, 100000),
+       })
+       c.Assert(err, check.IsNil)
+
+       // Write different data but supply the same hash. Should be
+       // rejected (even though our fake backend doesn't notice).
+       _, err = cache.BlockWrite(ctx, BlockWriteOptions{
+               Hash: real.Locator[:32],
+               Data: make([]byte, 10),
+       })
+       c.Check(err, check.ErrorMatches, `block hash .+ did not match provided hash .+`)
+
+       // Ensure the bogus write didn't overwrite (or delete) the
+       // real cached data associated with that hash.
+       delete(backend.data, real.Locator)
+       n, err := cache.ReadAt(real.Locator, make([]byte, 100), 0)
+       c.Check(n, check.Equals, 100)
+       c.Check(err, check.IsNil)
+}
+
 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
        backend := &keepGatewayMemoryBacked{}
        cache := DiskCache{
@@ -111,15 +157,29 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
                Data: make([]byte, 44000000),
        })
        c.Check(err, check.IsNil)
+
+       // Wait for tidy to finish, check that it doesn't delete the
+       // only block.
        time.Sleep(time.Millisecond)
+       for atomic.LoadInt32(&cache.tidying) > 0 {
+               time.Sleep(time.Millisecond)
+       }
+       c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
+
        resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
                Data: make([]byte, 32000000),
        })
        c.Check(err, check.IsNil)
        delete(backend.data, resp1.Locator)
        delete(backend.data, resp2.Locator)
-       cache.tidyHoldUntil = time.Time{}
-       cache.tidy()
+
+       // Wait for tidy to finish, check that it deleted the older
+       // block.
+       time.Sleep(time.Millisecond)
+       for atomic.LoadInt32(&cache.tidying) > 0 {
+               time.Sleep(time.Millisecond)
+       }
+       c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
 
        n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
        c.Check(n, check.Equals, 0)
@@ -130,29 +190,76 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
        c.Check(err, check.IsNil)
 }
 
-func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
+       s.testConcurrentReaders(c, true, false)
+}
+func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
+       s.testConcurrentReaders(c, false, true)
+}
+func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
        blksize := 64000000
        backend := &keepGatewayMemoryBacked{}
        cache := DiskCache{
                KeepGateway: backend,
-               MaxSize:     int64(blksize),
+               MaxSize:     ByteSizeOrPercent(blksize),
                Dir:         c.MkDir(),
                Logger:      ctxlog.TestLogger(c),
        }
-       ctx := context.Background()
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
                Data: make([]byte, blksize),
        })
        c.Check(err, check.IsNil)
-       delete(backend.data, resp.Locator)
+       if cannotRefresh {
+               // Delete the block from the backing store, to ensure
+               // the cache doesn't rely on re-reading a block that
+               // it has just written.
+               delete(backend.data, resp.Locator)
+       }
+       if mangleCache {
+               // Replace cache files with truncated files (and
+               // delete them outright) while the ReadAt loop is
+               // running, to ensure the cache can re-fetch from the
+               // backend as needed.
+               var nRemove, nTrunc int
+               defer func() {
+                       c.Logf("nRemove %d", nRemove)
+                       c.Logf("nTrunc %d", nTrunc)
+               }()
+               go func() {
+                       // Truncate/delete the cache file at various
+                       // intervals. Readers should re-fetch/recover from
+                       // this.
+                       fnm := cache.cacheFile(resp.Locator)
+                       for ctx.Err() == nil {
+                               trunclen := rand.Int63() % int64(blksize*2)
+                               if trunclen > int64(blksize) {
+                                       err := os.Remove(fnm)
+                                       if err == nil {
+                                               nRemove++
+                                       }
+                               } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
+                                       err := os.Rename(fnm+"#", fnm)
+                                       if err == nil {
+                                               nTrunc++
+                                       }
+                               }
+                       }
+               }()
+       }
 
        failed := false
        var wg sync.WaitGroup
-       for offset := 0; offset < blksize; offset += 123456 {
-               offset := offset
+       var slots = make(chan bool, 100) // limit concurrency / memory usage
+       for i := 0; i < 20000; i++ {
+               offset := (i * 123456) % blksize
+               slots <- true
                wg.Add(1)
                go func() {
                        defer wg.Done()
+                       defer func() { <-slots }()
                        buf := make([]byte, 654321)
                        if offset+len(buf) > blksize {
                                buf = buf[:blksize-offset]
@@ -170,3 +277,188 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
        }
        wg.Wait()
 }
+
+func (s *keepCacheSuite) TestStreaming(c *check.C) {
+       blksize := 64000000
+       backend := &keepGatewayMemoryBacked{
+               pauseBlockReadUntil: make(chan error),
+               pauseBlockReadAfter: blksize / 8,
+       }
+       cache := DiskCache{
+               KeepGateway: backend,
+               MaxSize:     ByteSizeOrPercent(blksize),
+               Dir:         c.MkDir(),
+               Logger:      ctxlog.TestLogger(c),
+       }
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
+               Data: make([]byte, blksize),
+       })
+       c.Check(err, check.IsNil)
+       os.RemoveAll(filepath.Join(cache.Dir, resp.Locator[:3]))
+
+       // Start a lot of concurrent requests for various ranges of
+       // the same block. Our backend will return the first 8MB and
+       // then pause. The requests that can be satisfied by the first
+       // 8MB of data should return quickly. The rest should wait,
+       // and return after we release pauseBlockReadUntil.
+       var wgEarly, wgLate sync.WaitGroup
+       var doneEarly, doneLate int32
+       for i := 0; i < 10000; i++ {
+               wgEarly.Add(1)
+               go func() {
+                       offset := int(rand.Int63() % int64(blksize-benchReadSize))
+                       if offset+benchReadSize > backend.pauseBlockReadAfter {
+                               wgLate.Add(1)
+                               defer wgLate.Done()
+                               wgEarly.Done()
+                               defer atomic.AddInt32(&doneLate, 1)
+                       } else {
+                               defer wgEarly.Done()
+                               defer atomic.AddInt32(&doneEarly, 1)
+                       }
+                       buf := make([]byte, benchReadSize)
+                       n, err := cache.ReadAt(resp.Locator, buf, offset)
+                       c.Check(n, check.Equals, len(buf))
+                       c.Check(err, check.IsNil)
+               }()
+       }
+
+       // Ensure all early ranges finish while backend request(s) are
+       // paused.
+       wgEarly.Wait()
+       c.Logf("doneEarly = %d", doneEarly)
+       c.Check(doneLate, check.Equals, int32(0))
+
+       // Unpause backend request(s).
+       close(backend.pauseBlockReadUntil)
+       wgLate.Wait()
+       c.Logf("doneLate = %d", doneLate)
+}
+
+var _ = check.Suite(&keepCacheBenchSuite{})
+
+type keepCacheBenchSuite struct {
+       blksize  int
+       blkcount int
+       backend  *keepGatewayMemoryBacked
+       cache    *DiskCache
+       locators []string
+}
+
+func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
+       s.blksize = 64000000
+       s.blkcount = 8
+       s.backend = &keepGatewayMemoryBacked{}
+       s.cache = &DiskCache{
+               KeepGateway: s.backend,
+               MaxSize:     ByteSizeOrPercent(s.blksize),
+               Dir:         c.MkDir(),
+               Logger:      ctxlog.TestLogger(c),
+       }
+       s.locators = make([]string, s.blkcount)
+       data := make([]byte, s.blksize)
+       for b := 0; b < s.blkcount; b++ {
+               for i := range data {
+                       data[i] = byte(b)
+               }
+               resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
+                       Data: data,
+               })
+               c.Assert(err, check.IsNil)
+               s.locators[b] = resp.Locator
+       }
+}
+
+func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
+       var wg sync.WaitGroup
+       for i := 0; i < c.N; i++ {
+               i := i
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       buf := make([]byte, benchReadSize)
+                       _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+                       if err != nil {
+                               c.Fail()
+                       }
+               }()
+       }
+       wg.Wait()
+}
+
+func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
+       buf := make([]byte, benchReadSize)
+       for i := 0; i < c.N; i++ {
+               _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+               if err != nil {
+                       c.Fail()
+               }
+       }
+}
+
+const benchReadSize = 1000
+
+var _ = check.Suite(&fileOpsSuite{})
+
+type fileOpsSuite struct{}
+
+// BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
+// potential performance improvement of caching filehandles rather
+// than opening/closing the cache file for each read.
+//
+// Results from a development machine indicate a ~3x throughput
+// improvement: ~636 MB/s when opening/closing the file for each
+// 1000-byte read vs. ~2 GB/s when opening the file once and doing
+// concurrent reads using the same file descriptor.
+func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
+       fnm := c.MkDir() + "/testfile"
+       os.WriteFile(fnm, make([]byte, 64000000), 0700)
+       var wg sync.WaitGroup
+       for i := 0; i < c.N; i++ {
+               i := i
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+                       if err != nil {
+                               c.Fail()
+                               return
+                       }
+                       _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+                       if err != nil {
+                               c.Fail()
+                               return
+                       }
+                       f.Close()
+               }()
+       }
+       wg.Wait()
+}
+
+func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
+       fnm := c.MkDir() + "/testfile"
+       os.WriteFile(fnm, make([]byte, 64000000), 0700)
+       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+       if err != nil {
+               c.Fail()
+               return
+       }
+       var wg sync.WaitGroup
+       for i := 0; i < c.N; i++ {
+               i := i
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+                       if err != nil {
+                               c.Fail()
+                               return
+                       }
+               }()
+       }
+       wg.Wait()
+       f.Close()
+}