20318: Test mangling cache files while reading.
authorTom Clegg <tom@curii.com>
Tue, 19 Dec 2023 01:19:35 +0000 (20:19 -0500)
committerTom Clegg <tom@curii.com>
Tue, 19 Dec 2023 01:19:35 +0000 (20:19 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/keep_cache_test.go

index 23e7dfbd9ffd73cadf566bc2ac5648cdcf526dfe..6d1141047079f1854e5aac2a6b820180707995ab 100644 (file)
@@ -11,6 +11,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "math/rand"
        "os"
        "sync"
        "time"
@@ -131,7 +132,13 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
        c.Check(err, check.IsNil)
 }
 
-func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
+       s.testConcurrentReaders(c, true, false)
+}
+func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
+       s.testConcurrentReaders(c, false, true)
+}
+func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
        blksize := 64000000
        backend := &keepGatewayMemoryBacked{}
        cache := DiskCache{
@@ -140,20 +147,61 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
                Dir:         c.MkDir(),
                Logger:      ctxlog.TestLogger(c),
        }
-       ctx := context.Background()
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
                Data: make([]byte, blksize),
        })
        c.Check(err, check.IsNil)
-       delete(backend.data, resp.Locator)
+       if cannotRefresh {
+               // Delete the block from the backing store, to ensure
+               // the cache doesn't rely on re-reading a block that
+               // it has just written.
+               delete(backend.data, resp.Locator)
+       }
+       if mangleCache {
+               // Replace cache files with truncated files (and
+               // delete them outright) while the ReadAt loop is
+               // running, to ensure the cache can re-fetch from the
+               // backend as needed.
+               var nRemove, nTrunc int
+               defer func() {
+                       c.Logf("nRemove %d", nRemove)
+                       c.Logf("nTrunc %d", nTrunc)
+               }()
+               go func() {
+                       // Truncate/delete the cache file at various
+                       // intervals. Readers should re-fetch/recover from
+                       // this.
+                       fnm := cache.cacheFile(resp.Locator)
+                       for ctx.Err() == nil {
+                               trunclen := rand.Int63() % int64(blksize*2)
+                               if trunclen > int64(blksize) {
+                                       err := os.Remove(fnm)
+                                       if err == nil {
+                                               nRemove++
+                                       }
+                               } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
+                                       err := os.Rename(fnm+"#", fnm)
+                                       if err == nil {
+                                               nTrunc++
+                                       }
+                               }
+                       }
+               }()
+       }
 
        failed := false
        var wg sync.WaitGroup
-       for offset := 0; offset < blksize; offset += 123456 {
-               offset := offset
+       var slots = make(chan bool, 100) // limit concurrency / memory usage
+       for i := 0; i < 20000; i++ {
+               offset := (i * 123456) % blksize
+               slots <- true
                wg.Add(1)
                go func() {
                        defer wg.Done()
+                       defer func() { <-slots }()
                        buf := make([]byte, 654321)
                        if offset+len(buf) > blksize {
                                buf = buf[:blksize-offset]