From f9a0922e50904365e40b99372ed66f3a6f992cd7 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 18 Dec 2023 20:19:35 -0500 Subject: [PATCH] 20318: Test mangling cache files while reading. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/keep_cache_test.go | 58 ++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go index 23e7dfbd9f..6d11410470 100644 --- a/sdk/go/arvados/keep_cache_test.go +++ b/sdk/go/arvados/keep_cache_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "math/rand" "os" "sync" "time" @@ -131,7 +132,13 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) { c.Check(err, check.IsNil) } -func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) { +func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) { + s.testConcurrentReaders(c, true, false) +} +func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) { + s.testConcurrentReaders(c, false, true) +} +func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) { blksize := 64000000 backend := &keepGatewayMemoryBacked{} cache := DiskCache{ @@ -140,20 +147,61 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) { Dir: c.MkDir(), Logger: ctxlog.TestLogger(c), } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := cache.BlockWrite(ctx, BlockWriteOptions{ Data: make([]byte, blksize), }) c.Check(err, check.IsNil) - delete(backend.data, resp.Locator) + if cannotRefresh { + // Delete the block from the backing store, to ensure + // the cache doesn't rely on re-reading a block that + // it has just written. + delete(backend.data, resp.Locator) + } + if mangleCache { + // Replace cache files with truncated files (and + // delete them outright) while the ReadAt loop is + // running, to ensure the cache can re-fetch from the + // backend as needed. + var nRemove, nTrunc int + defer func() { + c.Logf("nRemove %d", nRemove) + c.Logf("nTrunc %d", nTrunc) + }() + go func() { + // Truncate/delete the cache file at various + // intervals. Readers should re-fetch/recover from + // this. + fnm := cache.cacheFile(resp.Locator) + for ctx.Err() == nil { + trunclen := rand.Int63() % int64(blksize*2) + if trunclen > int64(blksize) { + err := os.Remove(fnm) + if err == nil { + nRemove++ + } + } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil { + err := os.Rename(fnm+"#", fnm) + if err == nil { + nTrunc++ + } + } + } + }() + } failed := false var wg sync.WaitGroup - for offset := 0; offset < blksize; offset += 123456 { - offset := offset + var slots = make(chan bool, 100) // limit concurrency / memory usage + for i := 0; i < 20000; i++ { + offset := (i * 123456) % blksize + slots <- true wg.Add(1) go func() { defer wg.Done() + defer func() { <-slots }() buf := make([]byte, 654321) if offset+len(buf) > blksize { buf = buf[:blksize-offset] -- 2.30.2