1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
22 check "gopkg.in/check.v1"
25 var _ = check.Suite(&keepCacheSuite{})
27 type keepCacheSuite struct {
30 type keepGatewayBlackHole struct {
33 func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
34 return 0, errors.New("block not found")
36 func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
37 return 0, errors.New("block not found")
39 func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
42 func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
45 if opts.Reader == nil {
46 size, _ = io.Copy(h, bytes.NewReader(opts.Data))
48 size, _ = io.Copy(h, opts.Reader)
50 return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
53 type keepGatewayMemoryBacked struct {
55 data map[string][]byte
56 pauseBlockReadAfter int
57 pauseBlockReadUntil chan error
60 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
62 data := k.data[locator]
65 return 0, errors.New("block not found: " + locator)
68 if len(data) > offset {
69 n = copy(dst, data[offset:])
76 func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
78 data := k.data[opts.Locator]
81 return 0, errors.New("block not found: " + opts.Locator)
83 if k.pauseBlockReadUntil != nil {
84 src := bytes.NewReader(data)
85 n, err := io.CopyN(opts.WriteTo, src, int64(k.pauseBlockReadAfter))
89 <-k.pauseBlockReadUntil
90 n2, err := io.Copy(opts.WriteTo, src)
91 return int(n + n2), err
93 return opts.WriteTo.Write(data)
95 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
98 func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
100 data := bytes.NewBuffer(nil)
101 if opts.Reader == nil {
102 data.Write(opts.Data)
103 h.Write(data.Bytes())
105 io.Copy(io.MultiWriter(h, data), opts.Reader)
107 locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
110 k.data = map[string][]byte{}
112 k.data[locator] = data.Bytes()
114 return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
117 func (s *keepCacheSuite) TestBlockWrite(c *check.C) {
118 backend := &keepGatewayMemoryBacked{}
120 KeepGateway: backend,
123 Logger: ctxlog.TestLogger(c),
125 ctx := context.Background()
126 real, err := cache.BlockWrite(ctx, BlockWriteOptions{
127 Data: make([]byte, 100000),
129 c.Assert(err, check.IsNil)
131 // Write different data but supply the same hash. Should be
132 // rejected (even though our fake backend doesn't notice).
133 _, err = cache.BlockWrite(ctx, BlockWriteOptions{
134 Hash: real.Locator[:32],
135 Data: make([]byte, 10),
137 c.Check(err, check.ErrorMatches, `block hash .+ did not match provided hash .+`)
139 // Ensure the bogus write didn't overwrite (or delete) the
140 // real cached data associated with that hash.
141 delete(backend.data, real.Locator)
142 n, err := cache.ReadAt(real.Locator, make([]byte, 100), 0)
143 c.Check(n, check.Equals, 100)
144 c.Check(err, check.IsNil)
147 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
148 backend := &keepGatewayMemoryBacked{}
150 KeepGateway: backend,
153 Logger: ctxlog.TestLogger(c),
155 ctx := context.Background()
156 resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
157 Data: make([]byte, 44000000),
159 c.Check(err, check.IsNil)
161 // Wait for tidy to finish, check that it doesn't delete the
163 time.Sleep(time.Millisecond)
164 for atomic.LoadInt32(&cache.tidying) > 0 {
165 time.Sleep(time.Millisecond)
167 c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
169 resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
170 Data: make([]byte, 32000000),
172 c.Check(err, check.IsNil)
173 delete(backend.data, resp1.Locator)
174 delete(backend.data, resp2.Locator)
176 // Wait for tidy to finish, check that it deleted the older
178 time.Sleep(time.Millisecond)
179 for atomic.LoadInt32(&cache.tidying) > 0 {
180 time.Sleep(time.Millisecond)
182 c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
184 n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
185 c.Check(n, check.Equals, 0)
186 c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
188 n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
189 c.Check(n > 0, check.Equals, true)
190 c.Check(err, check.IsNil)
193 func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
194 s.testConcurrentReaders(c, true, false)
196 func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
197 s.testConcurrentReaders(c, false, true)
199 func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
201 backend := &keepGatewayMemoryBacked{}
203 KeepGateway: backend,
204 MaxSize: ByteSizeOrPercent(blksize),
206 Logger: ctxlog.TestLogger(c),
208 ctx, cancel := context.WithCancel(context.Background())
211 resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
212 Data: make([]byte, blksize),
214 c.Check(err, check.IsNil)
216 // Delete the block from the backing store, to ensure
217 // the cache doesn't rely on re-reading a block that
218 // it has just written.
219 delete(backend.data, resp.Locator)
222 // Replace cache files with truncated files (and
223 // delete them outright) while the ReadAt loop is
224 // running, to ensure the cache can re-fetch from the
225 // backend as needed.
226 var nRemove, nTrunc int
228 c.Logf("nRemove %d", nRemove)
229 c.Logf("nTrunc %d", nTrunc)
232 // Truncate/delete the cache file at various
233 // intervals. Readers should re-fetch/recover from
235 fnm := cache.cacheFile(resp.Locator)
236 for ctx.Err() == nil {
237 trunclen := rand.Int63() % int64(blksize*2)
238 if trunclen > int64(blksize) {
239 err := os.Remove(fnm)
243 } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
244 err := os.Rename(fnm+"#", fnm)
254 var wg sync.WaitGroup
255 var slots = make(chan bool, 100) // limit concurrency / memory usage
256 for i := 0; i < 20000; i++ {
257 offset := (i * 123456) % blksize
262 defer func() { <-slots }()
263 buf := make([]byte, 654321)
264 if offset+len(buf) > blksize {
265 buf = buf[:blksize-offset]
267 n, err := cache.ReadAt(resp.Locator, buf, offset)
269 // don't fill logs with subsequent errors
272 if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
275 c.Assert(n, check.Equals, len(buf))
281 func (s *keepCacheSuite) TestStreaming(c *check.C) {
283 backend := &keepGatewayMemoryBacked{
284 pauseBlockReadUntil: make(chan error),
285 pauseBlockReadAfter: blksize / 8,
288 KeepGateway: backend,
289 MaxSize: ByteSizeOrPercent(blksize),
291 Logger: ctxlog.TestLogger(c),
293 ctx, cancel := context.WithCancel(context.Background())
296 resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
297 Data: make([]byte, blksize),
299 c.Check(err, check.IsNil)
300 os.RemoveAll(filepath.Join(cache.Dir, resp.Locator[:3]))
302 // Start a lot of concurrent requests for various ranges of
303 // the same block. Our backend will return the first 8MB and
304 // then pause. The requests that can be satisfied by the first
305 // 8MB of data should return quickly. The rest should wait,
306 // and return after we release pauseBlockReadUntil.
307 var wgEarly, wgLate sync.WaitGroup
308 var doneEarly, doneLate int32
309 for i := 0; i < 10000; i++ {
312 offset := int(rand.Int63() % int64(blksize-benchReadSize))
313 if offset+benchReadSize > backend.pauseBlockReadAfter {
317 defer atomic.AddInt32(&doneLate, 1)
320 defer atomic.AddInt32(&doneEarly, 1)
322 buf := make([]byte, benchReadSize)
323 n, err := cache.ReadAt(resp.Locator, buf, offset)
324 c.Check(n, check.Equals, len(buf))
325 c.Check(err, check.IsNil)
329 // Ensure all early ranges finish while backend request(s) are
332 c.Logf("doneEarly = %d", doneEarly)
333 c.Check(doneLate, check.Equals, int32(0))
335 // Unpause backend request(s).
336 close(backend.pauseBlockReadUntil)
338 c.Logf("doneLate = %d", doneLate)
341 var _ = check.Suite(&keepCacheBenchSuite{})
343 type keepCacheBenchSuite struct {
346 backend *keepGatewayMemoryBacked
351 func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
354 s.backend = &keepGatewayMemoryBacked{}
355 s.cache = &DiskCache{
356 KeepGateway: s.backend,
357 MaxSize: ByteSizeOrPercent(s.blksize),
359 Logger: ctxlog.TestLogger(c),
361 s.locators = make([]string, s.blkcount)
362 data := make([]byte, s.blksize)
363 for b := 0; b < s.blkcount; b++ {
364 for i := range data {
367 resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
370 c.Assert(err, check.IsNil)
371 s.locators[b] = resp.Locator
375 func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
376 var wg sync.WaitGroup
377 for i := 0; i < c.N; i++ {
382 buf := make([]byte, benchReadSize)
383 _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
392 func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
393 buf := make([]byte, benchReadSize)
394 for i := 0; i < c.N; i++ {
395 _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
402 const benchReadSize = 1000
404 var _ = check.Suite(&fileOpsSuite{})
406 type fileOpsSuite struct{}
408 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
409 // potential performance improvement of caching filehandles rather
410 // than opening/closing the cache file for each read.
412 // Results from a development machine indicate a ~3x throughput
413 // improvement: ~636 MB/s when opening/closing the file for each
414 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
415 // concurrent reads using the same file descriptor.
416 func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
417 fnm := c.MkDir() + "/testfile"
418 os.WriteFile(fnm, make([]byte, 64000000), 0700)
419 var wg sync.WaitGroup
420 for i := 0; i < c.N; i++ {
425 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
430 _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
441 func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
442 fnm := c.MkDir() + "/testfile"
443 os.WriteFile(fnm, make([]byte, 64000000), 0700)
444 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
449 var wg sync.WaitGroup
450 for i := 0; i < c.N; i++ {
455 _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)