20318: Route KeepClient block writes through disk cache.
[arvados.git] / sdk / go / arvados / keep_cache_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "math/rand"
15         "os"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/ctxlog"
20         check "gopkg.in/check.v1"
21 )
22
23 var _ = check.Suite(&keepCacheSuite{})
24
25 type keepCacheSuite struct {
26 }
27
28 type keepGatewayBlackHole struct {
29 }
30
31 func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
32         return 0, errors.New("block not found")
33 }
34 func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
35         return 0, errors.New("block not found")
36 }
37 func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
38         return locator, nil
39 }
40 func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
41         h := md5.New()
42         var size int64
43         if opts.Reader == nil {
44                 size, _ = io.Copy(h, bytes.NewReader(opts.Data))
45         } else {
46                 size, _ = io.Copy(h, opts.Reader)
47         }
48         return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
49 }
50
51 type keepGatewayMemoryBacked struct {
52         mtx  sync.RWMutex
53         data map[string][]byte
54 }
55
56 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
57         k.mtx.RLock()
58         data := k.data[locator]
59         k.mtx.RUnlock()
60         if data == nil {
61                 return 0, errors.New("block not found: " + locator)
62         }
63         var n int
64         if len(data) > offset {
65                 n = copy(dst, data[offset:])
66         }
67         if n < len(dst) {
68                 return n, io.EOF
69         }
70         return n, nil
71 }
72 func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
73         k.mtx.RLock()
74         data := k.data[opts.Locator]
75         k.mtx.RUnlock()
76         if data == nil {
77                 return 0, errors.New("block not found: " + opts.Locator)
78         }
79         return opts.WriteTo.Write(data)
80 }
81 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
82         return locator, nil
83 }
84 func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
85         h := md5.New()
86         data := bytes.NewBuffer(nil)
87         if opts.Reader == nil {
88                 data.Write(opts.Data)
89                 h.Write(data.Bytes())
90         } else {
91                 io.Copy(io.MultiWriter(h, data), opts.Reader)
92         }
93         locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
94         k.mtx.Lock()
95         if k.data == nil {
96                 k.data = map[string][]byte{}
97         }
98         k.data[locator] = data.Bytes()
99         k.mtx.Unlock()
100         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
101 }
102
103 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
104         backend := &keepGatewayMemoryBacked{}
105         cache := DiskCache{
106                 KeepGateway: backend,
107                 MaxSize:     40000000,
108                 Dir:         c.MkDir(),
109                 Logger:      ctxlog.TestLogger(c),
110         }
111         ctx := context.Background()
112         resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
113                 Data: make([]byte, 44000000),
114         })
115         c.Check(err, check.IsNil)
116         time.Sleep(time.Millisecond)
117         resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
118                 Data: make([]byte, 32000000),
119         })
120         c.Check(err, check.IsNil)
121         delete(backend.data, resp1.Locator)
122         delete(backend.data, resp2.Locator)
123         cache.tidyHoldUntil = time.Time{}
124         cache.tidy()
125
126         n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
127         c.Check(n, check.Equals, 0)
128         c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
129
130         n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
131         c.Check(n > 0, check.Equals, true)
132         c.Check(err, check.IsNil)
133 }
134
135 func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
136         s.testConcurrentReaders(c, true, false)
137 }
138 func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
139         s.testConcurrentReaders(c, false, true)
140 }
141 func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
142         blksize := 64000000
143         backend := &keepGatewayMemoryBacked{}
144         cache := DiskCache{
145                 KeepGateway: backend,
146                 MaxSize:     int64(blksize),
147                 Dir:         c.MkDir(),
148                 Logger:      ctxlog.TestLogger(c),
149         }
150         ctx, cancel := context.WithCancel(context.Background())
151         defer cancel()
152
153         resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
154                 Data: make([]byte, blksize),
155         })
156         c.Check(err, check.IsNil)
157         if cannotRefresh {
158                 // Delete the block from the backing store, to ensure
159                 // the cache doesn't rely on re-reading a block that
160                 // it has just written.
161                 delete(backend.data, resp.Locator)
162         }
163         if mangleCache {
164                 // Replace cache files with truncated files (and
165                 // delete them outright) while the ReadAt loop is
166                 // running, to ensure the cache can re-fetch from the
167                 // backend as needed.
168                 var nRemove, nTrunc int
169                 defer func() {
170                         c.Logf("nRemove %d", nRemove)
171                         c.Logf("nTrunc %d", nTrunc)
172                 }()
173                 go func() {
174                         // Truncate/delete the cache file at various
175                         // intervals. Readers should re-fetch/recover from
176                         // this.
177                         fnm := cache.cacheFile(resp.Locator)
178                         for ctx.Err() == nil {
179                                 trunclen := rand.Int63() % int64(blksize*2)
180                                 if trunclen > int64(blksize) {
181                                         err := os.Remove(fnm)
182                                         if err == nil {
183                                                 nRemove++
184                                         }
185                                 } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
186                                         err := os.Rename(fnm+"#", fnm)
187                                         if err == nil {
188                                                 nTrunc++
189                                         }
190                                 }
191                         }
192                 }()
193         }
194
195         failed := false
196         var wg sync.WaitGroup
197         var slots = make(chan bool, 100) // limit concurrency / memory usage
198         for i := 0; i < 20000; i++ {
199                 offset := (i * 123456) % blksize
200                 slots <- true
201                 wg.Add(1)
202                 go func() {
203                         defer wg.Done()
204                         defer func() { <-slots }()
205                         buf := make([]byte, 654321)
206                         if offset+len(buf) > blksize {
207                                 buf = buf[:blksize-offset]
208                         }
209                         n, err := cache.ReadAt(resp.Locator, buf, offset)
210                         if failed {
211                                 // don't fill logs with subsequent errors
212                                 return
213                         }
214                         if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
215                                 failed = true
216                         }
217                         c.Assert(n, check.Equals, len(buf))
218                 }()
219         }
220         wg.Wait()
221 }
222
223 var _ = check.Suite(&keepCacheBenchSuite{})
224
225 type keepCacheBenchSuite struct {
226         blksize  int
227         blkcount int
228         backend  *keepGatewayMemoryBacked
229         cache    *DiskCache
230         locators []string
231 }
232
233 func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
234         s.blksize = 64000000
235         s.blkcount = 8
236         s.backend = &keepGatewayMemoryBacked{}
237         s.cache = &DiskCache{
238                 KeepGateway: s.backend,
239                 MaxSize:     int64(s.blksize),
240                 Dir:         c.MkDir(),
241                 Logger:      ctxlog.TestLogger(c),
242         }
243         s.locators = make([]string, s.blkcount)
244         data := make([]byte, s.blksize)
245         for b := 0; b < s.blkcount; b++ {
246                 for i := range data {
247                         data[i] = byte(b)
248                 }
249                 resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
250                         Data: data,
251                 })
252                 c.Assert(err, check.IsNil)
253                 s.locators[b] = resp.Locator
254         }
255 }
256
257 func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
258         var wg sync.WaitGroup
259         for i := 0; i < c.N; i++ {
260                 i := i
261                 wg.Add(1)
262                 go func() {
263                         defer wg.Done()
264                         buf := make([]byte, benchReadSize)
265                         _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
266                         if err != nil {
267                                 c.Fail()
268                         }
269                 }()
270         }
271         wg.Wait()
272 }
273
274 func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
275         buf := make([]byte, benchReadSize)
276         for i := 0; i < c.N; i++ {
277                 _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
278                 if err != nil {
279                         c.Fail()
280                 }
281         }
282 }
283
284 const benchReadSize = 1000
285
286 var _ = check.Suite(&fileOpsSuite{})
287
288 type fileOpsSuite struct{}
289
290 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
291 // potential performance improvement of caching filehandles rather
292 // than opening/closing the cache file for each read.
293 //
294 // Results from a development machine indicate a ~3x throughput
295 // improvement: ~636 MB/s when opening/closing the file for each
296 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
297 // concurrent reads using the same file descriptor.
298 func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
299         fnm := c.MkDir() + "/testfile"
300         os.WriteFile(fnm, make([]byte, 64000000), 0700)
301         var wg sync.WaitGroup
302         for i := 0; i < c.N; i++ {
303                 wg.Add(1)
304                 go func() {
305                         defer wg.Done()
306                         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
307                         if err != nil {
308                                 c.Fail()
309                                 return
310                         }
311                         _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
312                         if err != nil {
313                                 c.Fail()
314                                 return
315                         }
316                         f.Close()
317                 }()
318         }
319         wg.Wait()
320 }
321
322 func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
323         fnm := c.MkDir() + "/testfile"
324         os.WriteFile(fnm, make([]byte, 64000000), 0700)
325         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
326         if err != nil {
327                 c.Fail()
328                 return
329         }
330         var wg sync.WaitGroup
331         for i := 0; i < c.N; i++ {
332                 i := i
333                 wg.Add(1)
334                 go func() {
335                         defer wg.Done()
336                         _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
337                         if err != nil {
338                                 c.Fail()
339                                 return
340                         }
341                 }()
342         }
343         wg.Wait()
344         f.Close()
345 }