1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
18 "git.arvados.org/arvados.git/sdk/go/ctxlog"
19 check "gopkg.in/check.v1"
22 var _ = check.Suite(&keepCacheSuite{})
24 type keepCacheSuite struct {
27 type keepGatewayBlackHole struct {
30 func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
31 return 0, errors.New("block not found")
33 func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
34 return 0, errors.New("block not found")
36 func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
39 func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
42 if opts.Reader == nil {
43 size, _ = io.Copy(h, bytes.NewReader(opts.Data))
45 size, _ = io.Copy(h, opts.Reader)
47 return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
50 type keepGatewayMemoryBacked struct {
52 data map[string][]byte
55 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
57 data := k.data[locator]
60 return 0, errors.New("block not found: " + locator)
63 if len(data) > offset {
64 n = copy(dst, data[offset:])
71 func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
73 data := k.data[opts.Locator]
76 return 0, errors.New("block not found: " + opts.Locator)
78 return opts.WriteTo.Write(data)
80 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
83 func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
85 data := bytes.NewBuffer(nil)
86 if opts.Reader == nil {
90 io.Copy(io.MultiWriter(h, data), opts.Reader)
92 locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
95 k.data = map[string][]byte{}
97 k.data[locator] = data.Bytes()
99 return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
102 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
103 backend := &keepGatewayMemoryBacked{}
105 KeepGateway: backend,
108 Logger: ctxlog.TestLogger(c),
110 ctx := context.Background()
111 resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
112 Data: make([]byte, 44000000),
114 c.Check(err, check.IsNil)
115 time.Sleep(time.Millisecond)
116 resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
117 Data: make([]byte, 32000000),
119 c.Check(err, check.IsNil)
120 delete(backend.data, resp1.Locator)
121 delete(backend.data, resp2.Locator)
122 cache.tidyHoldUntil = time.Time{}
125 n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
126 c.Check(n, check.Equals, 0)
127 c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
129 n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
130 c.Check(n > 0, check.Equals, true)
131 c.Check(err, check.IsNil)
134 func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
136 backend := &keepGatewayMemoryBacked{}
138 KeepGateway: backend,
139 MaxSize: int64(blksize),
141 Logger: ctxlog.TestLogger(c),
143 ctx := context.Background()
144 resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
145 Data: make([]byte, blksize),
147 c.Check(err, check.IsNil)
148 delete(backend.data, resp.Locator)
151 var wg sync.WaitGroup
152 for offset := 0; offset < blksize; offset += 123456 {
157 buf := make([]byte, 654321)
158 if offset+len(buf) > blksize {
159 buf = buf[:blksize-offset]
161 n, err := cache.ReadAt(resp.Locator, buf, offset)
163 // don't fill logs with subsequent errors
166 if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
169 c.Assert(n, check.Equals, len(buf))
175 const benchReadSize = 1000
177 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
178 // potential performance improvement of caching filehandles rather
179 // than opening/closing the cache file for each read.
181 // Results from a development machine indicate a ~3x throughput
182 // improvement: ~636 MB/s when opening/closing the file for each
183 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
184 // concurrent reads using the same file descriptor.
185 func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
186 fnm := c.MkDir() + "/testfile"
187 os.WriteFile(fnm, make([]byte, 64000000), 0700)
188 var wg sync.WaitGroup
189 for i := 0; i < c.N; i++ {
193 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
198 _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
209 func (s *keepCacheSuite) BenchmarkKeepOpen(c *check.C) {
210 fnm := c.MkDir() + "/testfile"
211 os.WriteFile(fnm, make([]byte, 64000000), 0700)
212 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
217 var wg sync.WaitGroup
218 for i := 0; i < c.N; i++ {
223 _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)