"errors"
"fmt"
"io"
+ "math/rand"
"os"
"sync"
"time"
c.Check(err, check.IsNil)
}
-func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
+ s.testConcurrentReaders(c, true, false)
+}
+func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
+ s.testConcurrentReaders(c, false, true)
+}
+func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
blksize := 64000000
backend := &keepGatewayMemoryBacked{}
cache := DiskCache{
Dir: c.MkDir(),
Logger: ctxlog.TestLogger(c),
}
- ctx := context.Background()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
Data: make([]byte, blksize),
})
c.Check(err, check.IsNil)
- delete(backend.data, resp.Locator)
+ if cannotRefresh {
+ // Delete the block from the backing store, to ensure
+ // the cache doesn't rely on re-reading a block that
+ // it has just written.
+ delete(backend.data, resp.Locator)
+ }
+ if mangleCache {
+ // Replace cache files with truncated files (and
+ // delete them outright) while the ReadAt loop is
+ // running, to ensure the cache can re-fetch from the
+ // backend as needed.
+ var nRemove, nTrunc int
+ defer func() {
+ c.Logf("nRemove %d", nRemove)
+ c.Logf("nTrunc %d", nTrunc)
+ }()
+ go func() {
+ // Truncate/delete the cache file at various
+ // intervals. Readers should re-fetch/recover from
+ // this.
+ fnm := cache.cacheFile(resp.Locator)
+ for ctx.Err() == nil {
+ trunclen := rand.Int63() % int64(blksize*2)
+ if trunclen > int64(blksize) {
+ err := os.Remove(fnm)
+ if err == nil {
+ nRemove++
+ }
+ } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
+ err := os.Rename(fnm+"#", fnm)
+ if err == nil {
+ nTrunc++
+ }
+ }
+ }
+ }()
+ }
failed := false
var wg sync.WaitGroup
- for offset := 0; offset < blksize; offset += 123456 {
- offset := offset
+ var slots = make(chan bool, 100) // limit concurrency / memory usage
+ for i := 0; i < 20000; i++ {
+ offset := (i * 123456) % blksize
+ slots <- true
wg.Add(1)
go func() {
defer wg.Done()
+ defer func() { <-slots }()
buf := make([]byte, 654321)
if offset+len(buf) > blksize {
buf = buf[:blksize-offset]