1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.arvados.org/arvados.git/sdk/go/ctxlog"
20 check "gopkg.in/check.v1"
23 var _ = check.Suite(&keepCacheSuite{})
25 type keepCacheSuite struct {
28 type keepGatewayBlackHole struct {
31 func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
32 return 0, errors.New("block not found")
34 func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
35 return 0, errors.New("block not found")
37 func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
40 func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
43 if opts.Reader == nil {
44 size, _ = io.Copy(h, bytes.NewReader(opts.Data))
46 size, _ = io.Copy(h, opts.Reader)
48 return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
51 type keepGatewayMemoryBacked struct {
53 data map[string][]byte
56 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
58 data := k.data[locator]
61 return 0, errors.New("block not found: " + locator)
64 if len(data) > offset {
65 n = copy(dst, data[offset:])
72 func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
74 data := k.data[opts.Locator]
77 return 0, errors.New("block not found: " + opts.Locator)
79 return opts.WriteTo.Write(data)
81 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
84 func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
86 data := bytes.NewBuffer(nil)
87 if opts.Reader == nil {
91 io.Copy(io.MultiWriter(h, data), opts.Reader)
93 locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
96 k.data = map[string][]byte{}
98 k.data[locator] = data.Bytes()
100 return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
103 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
104 backend := &keepGatewayMemoryBacked{}
106 KeepGateway: backend,
109 Logger: ctxlog.TestLogger(c),
111 ctx := context.Background()
112 resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
113 Data: make([]byte, 44000000),
115 c.Check(err, check.IsNil)
116 time.Sleep(time.Millisecond)
117 resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
118 Data: make([]byte, 32000000),
120 c.Check(err, check.IsNil)
121 delete(backend.data, resp1.Locator)
122 delete(backend.data, resp2.Locator)
123 cache.tidyHoldUntil = time.Time{}
126 n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
127 c.Check(n, check.Equals, 0)
128 c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
130 n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
131 c.Check(n > 0, check.Equals, true)
132 c.Check(err, check.IsNil)
135 func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
136 s.testConcurrentReaders(c, true, false)
138 func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
139 s.testConcurrentReaders(c, false, true)
141 func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
143 backend := &keepGatewayMemoryBacked{}
145 KeepGateway: backend,
146 MaxSize: int64(blksize),
148 Logger: ctxlog.TestLogger(c),
150 ctx, cancel := context.WithCancel(context.Background())
153 resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
154 Data: make([]byte, blksize),
156 c.Check(err, check.IsNil)
158 // Delete the block from the backing store, to ensure
159 // the cache doesn't rely on re-reading a block that
160 // it has just written.
161 delete(backend.data, resp.Locator)
164 // Replace cache files with truncated files (and
165 // delete them outright) while the ReadAt loop is
166 // running, to ensure the cache can re-fetch from the
167 // backend as needed.
168 var nRemove, nTrunc int
170 c.Logf("nRemove %d", nRemove)
171 c.Logf("nTrunc %d", nTrunc)
174 // Truncate/delete the cache file at various
175 // intervals. Readers should re-fetch/recover from
177 fnm := cache.cacheFile(resp.Locator)
178 for ctx.Err() == nil {
179 trunclen := rand.Int63() % int64(blksize*2)
180 if trunclen > int64(blksize) {
181 err := os.Remove(fnm)
185 } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
186 err := os.Rename(fnm+"#", fnm)
196 var wg sync.WaitGroup
197 var slots = make(chan bool, 100) // limit concurrency / memory usage
198 for i := 0; i < 20000; i++ {
199 offset := (i * 123456) % blksize
204 defer func() { <-slots }()
205 buf := make([]byte, 654321)
206 if offset+len(buf) > blksize {
207 buf = buf[:blksize-offset]
209 n, err := cache.ReadAt(resp.Locator, buf, offset)
211 // don't fill logs with subsequent errors
214 if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
217 c.Assert(n, check.Equals, len(buf))
223 var _ = check.Suite(&keepCacheBenchSuite{})
225 type keepCacheBenchSuite struct {
228 backend *keepGatewayMemoryBacked
233 func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
236 s.backend = &keepGatewayMemoryBacked{}
237 s.cache = &DiskCache{
238 KeepGateway: s.backend,
239 MaxSize: int64(s.blksize),
241 Logger: ctxlog.TestLogger(c),
243 s.locators = make([]string, s.blkcount)
244 data := make([]byte, s.blksize)
245 for b := 0; b < s.blkcount; b++ {
246 for i := range data {
249 resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
252 c.Assert(err, check.IsNil)
253 s.locators[b] = resp.Locator
257 func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
258 var wg sync.WaitGroup
259 for i := 0; i < c.N; i++ {
264 buf := make([]byte, benchReadSize)
265 _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
274 func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
275 buf := make([]byte, benchReadSize)
276 for i := 0; i < c.N; i++ {
277 _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
284 const benchReadSize = 1000
286 var _ = check.Suite(&fileOpsSuite{})
288 type fileOpsSuite struct{}
290 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
291 // potential performance improvement of caching filehandles rather
292 // than opening/closing the cache file for each read.
294 // Results from a development machine indicate a ~3x throughput
295 // improvement: ~636 MB/s when opening/closing the file for each
296 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
297 // concurrent reads using the same file descriptor.
298 func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
299 fnm := c.MkDir() + "/testfile"
300 os.WriteFile(fnm, make([]byte, 64000000), 0700)
301 var wg sync.WaitGroup
302 for i := 0; i < c.N; i++ {
306 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
311 _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
322 func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
323 fnm := c.MkDir() + "/testfile"
324 os.WriteFile(fnm, make([]byte, 64000000), 0700)
325 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
330 var wg sync.WaitGroup
331 for i := 0; i < c.N; i++ {
336 _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)