Merge branch '22153-workflow-picker' refs #22153
[arvados.git] / sdk / go / arvados / keep_cache_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "math/rand"
15         "os"
16         "path/filepath"
17         "sync"
18         "sync/atomic"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         check "gopkg.in/check.v1"
23 )
24
25 var _ = check.Suite(&keepCacheSuite{})
26
27 type keepCacheSuite struct {
28 }
29
30 type keepGatewayBlackHole struct {
31 }
32
33 func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
34         return 0, errors.New("block not found")
35 }
36 func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
37         return 0, errors.New("block not found")
38 }
39 func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
40         return locator, nil
41 }
42 func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
43         h := md5.New()
44         var size int64
45         if opts.Reader == nil {
46                 size, _ = io.Copy(h, bytes.NewReader(opts.Data))
47         } else {
48                 size, _ = io.Copy(h, opts.Reader)
49         }
50         return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
51 }
52
53 type keepGatewayMemoryBacked struct {
54         mtx                 sync.RWMutex
55         data                map[string][]byte
56         pauseBlockReadAfter int
57         pauseBlockReadUntil chan error
58 }
59
60 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
61         k.mtx.RLock()
62         data := k.data[locator]
63         k.mtx.RUnlock()
64         if data == nil {
65                 return 0, errors.New("block not found: " + locator)
66         }
67         var n int
68         if len(data) > offset {
69                 n = copy(dst, data[offset:])
70         }
71         if n < len(dst) {
72                 return n, io.EOF
73         }
74         return n, nil
75 }
76 func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
77         k.mtx.RLock()
78         data := k.data[opts.Locator]
79         k.mtx.RUnlock()
80         if data == nil {
81                 return 0, errors.New("block not found: " + opts.Locator)
82         }
83         if k.pauseBlockReadUntil != nil {
84                 src := bytes.NewReader(data)
85                 n, err := io.CopyN(opts.WriteTo, src, int64(k.pauseBlockReadAfter))
86                 if err != nil {
87                         return int(n), err
88                 }
89                 <-k.pauseBlockReadUntil
90                 n2, err := io.Copy(opts.WriteTo, src)
91                 return int(n + n2), err
92         }
93         return opts.WriteTo.Write(data)
94 }
95 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
96         return locator, nil
97 }
98 func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
99         h := md5.New()
100         data := bytes.NewBuffer(nil)
101         if opts.Reader == nil {
102                 data.Write(opts.Data)
103                 h.Write(data.Bytes())
104         } else {
105                 io.Copy(io.MultiWriter(h, data), opts.Reader)
106         }
107         locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
108         k.mtx.Lock()
109         if k.data == nil {
110                 k.data = map[string][]byte{}
111         }
112         k.data[locator] = data.Bytes()
113         k.mtx.Unlock()
114         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
115 }
116
117 func (s *keepCacheSuite) TestBlockWrite(c *check.C) {
118         backend := &keepGatewayMemoryBacked{}
119         cache := DiskCache{
120                 KeepGateway: backend,
121                 MaxSize:     40000000,
122                 Dir:         c.MkDir(),
123                 Logger:      ctxlog.TestLogger(c),
124         }
125         ctx := context.Background()
126         real, err := cache.BlockWrite(ctx, BlockWriteOptions{
127                 Data: make([]byte, 100000),
128         })
129         c.Assert(err, check.IsNil)
130
131         // Write different data but supply the same hash. Should be
132         // rejected (even though our fake backend doesn't notice).
133         _, err = cache.BlockWrite(ctx, BlockWriteOptions{
134                 Hash: real.Locator[:32],
135                 Data: make([]byte, 10),
136         })
137         c.Check(err, check.ErrorMatches, `block hash .+ did not match provided hash .+`)
138
139         // Ensure the bogus write didn't overwrite (or delete) the
140         // real cached data associated with that hash.
141         delete(backend.data, real.Locator)
142         n, err := cache.ReadAt(real.Locator, make([]byte, 100), 0)
143         c.Check(n, check.Equals, 100)
144         c.Check(err, check.IsNil)
145 }
146
147 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
148         backend := &keepGatewayMemoryBacked{}
149         cache := DiskCache{
150                 KeepGateway: backend,
151                 MaxSize:     40000000,
152                 Dir:         c.MkDir(),
153                 Logger:      ctxlog.TestLogger(c),
154         }
155         ctx := context.Background()
156         resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
157                 Data: make([]byte, 44000000),
158         })
159         c.Check(err, check.IsNil)
160
161         // Wait for tidy to finish, check that it doesn't delete the
162         // only block.
163         time.Sleep(time.Millisecond)
164         for atomic.LoadInt32(&cache.tidying) > 0 {
165                 time.Sleep(time.Millisecond)
166         }
167         c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
168
169         resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
170                 Data: make([]byte, 32000000),
171         })
172         c.Check(err, check.IsNil)
173         delete(backend.data, resp1.Locator)
174         delete(backend.data, resp2.Locator)
175
176         // Wait for tidy to finish, check that it deleted the older
177         // block.
178         time.Sleep(time.Millisecond)
179         for atomic.LoadInt32(&cache.tidying) > 0 {
180                 time.Sleep(time.Millisecond)
181         }
182         c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
183
184         n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
185         c.Check(n, check.Equals, 0)
186         c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
187
188         n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
189         c.Check(n > 0, check.Equals, true)
190         c.Check(err, check.IsNil)
191 }
192
193 func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
194         s.testConcurrentReaders(c, true, false)
195 }
196 func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
197         s.testConcurrentReaders(c, false, true)
198 }
199 func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
200         blksize := 64000000
201         backend := &keepGatewayMemoryBacked{}
202         cache := DiskCache{
203                 KeepGateway: backend,
204                 MaxSize:     ByteSizeOrPercent(blksize),
205                 Dir:         c.MkDir(),
206                 Logger:      ctxlog.TestLogger(c),
207         }
208         ctx, cancel := context.WithCancel(context.Background())
209         defer cancel()
210
211         resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
212                 Data: make([]byte, blksize),
213         })
214         c.Check(err, check.IsNil)
215         if cannotRefresh {
216                 // Delete the block from the backing store, to ensure
217                 // the cache doesn't rely on re-reading a block that
218                 // it has just written.
219                 delete(backend.data, resp.Locator)
220         }
221         if mangleCache {
222                 // Replace cache files with truncated files (and
223                 // delete them outright) while the ReadAt loop is
224                 // running, to ensure the cache can re-fetch from the
225                 // backend as needed.
226                 var nRemove, nTrunc int
227                 defer func() {
228                         c.Logf("nRemove %d", nRemove)
229                         c.Logf("nTrunc %d", nTrunc)
230                 }()
231                 go func() {
232                         // Truncate/delete the cache file at various
233                         // intervals. Readers should re-fetch/recover from
234                         // this.
235                         fnm := cache.cacheFile(resp.Locator)
236                         for ctx.Err() == nil {
237                                 trunclen := rand.Int63() % int64(blksize*2)
238                                 if trunclen > int64(blksize) {
239                                         err := os.Remove(fnm)
240                                         if err == nil {
241                                                 nRemove++
242                                         }
243                                 } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
244                                         err := os.Rename(fnm+"#", fnm)
245                                         if err == nil {
246                                                 nTrunc++
247                                         }
248                                 }
249                         }
250                 }()
251         }
252
253         failed := false
254         var wg sync.WaitGroup
255         var slots = make(chan bool, 100) // limit concurrency / memory usage
256         for i := 0; i < 20000; i++ {
257                 offset := (i * 123456) % blksize
258                 slots <- true
259                 wg.Add(1)
260                 go func() {
261                         defer wg.Done()
262                         defer func() { <-slots }()
263                         buf := make([]byte, 654321)
264                         if offset+len(buf) > blksize {
265                                 buf = buf[:blksize-offset]
266                         }
267                         n, err := cache.ReadAt(resp.Locator, buf, offset)
268                         if failed {
269                                 // don't fill logs with subsequent errors
270                                 return
271                         }
272                         if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
273                                 failed = true
274                         }
275                         c.Assert(n, check.Equals, len(buf))
276                 }()
277         }
278         wg.Wait()
279 }
280
281 func (s *keepCacheSuite) TestStreaming(c *check.C) {
282         blksize := 64000000
283         backend := &keepGatewayMemoryBacked{
284                 pauseBlockReadUntil: make(chan error),
285                 pauseBlockReadAfter: blksize / 8,
286         }
287         cache := DiskCache{
288                 KeepGateway: backend,
289                 MaxSize:     ByteSizeOrPercent(blksize),
290                 Dir:         c.MkDir(),
291                 Logger:      ctxlog.TestLogger(c),
292         }
293         ctx, cancel := context.WithCancel(context.Background())
294         defer cancel()
295
296         resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
297                 Data: make([]byte, blksize),
298         })
299         c.Check(err, check.IsNil)
300         os.RemoveAll(filepath.Join(cache.Dir, resp.Locator[:3]))
301
302         // Start a lot of concurrent requests for various ranges of
303         // the same block. Our backend will return the first 8MB and
304         // then pause. The requests that can be satisfied by the first
305         // 8MB of data should return quickly. The rest should wait,
306         // and return after we release pauseBlockReadUntil.
307         var wgEarly, wgLate sync.WaitGroup
308         var doneEarly, doneLate int32
309         for i := 0; i < 10000; i++ {
310                 wgEarly.Add(1)
311                 go func() {
312                         offset := int(rand.Int63() % int64(blksize-benchReadSize))
313                         if offset+benchReadSize > backend.pauseBlockReadAfter {
314                                 wgLate.Add(1)
315                                 defer wgLate.Done()
316                                 wgEarly.Done()
317                                 defer atomic.AddInt32(&doneLate, 1)
318                         } else {
319                                 defer wgEarly.Done()
320                                 defer atomic.AddInt32(&doneEarly, 1)
321                         }
322                         buf := make([]byte, benchReadSize)
323                         n, err := cache.ReadAt(resp.Locator, buf, offset)
324                         c.Check(n, check.Equals, len(buf))
325                         c.Check(err, check.IsNil)
326                 }()
327         }
328
329         // Ensure all early ranges finish while backend request(s) are
330         // paused.
331         wgEarly.Wait()
332         c.Logf("doneEarly = %d", doneEarly)
333         c.Check(doneLate, check.Equals, int32(0))
334
335         // Unpause backend request(s).
336         close(backend.pauseBlockReadUntil)
337         wgLate.Wait()
338         c.Logf("doneLate = %d", doneLate)
339 }
340
341 var _ = check.Suite(&keepCacheBenchSuite{})
342
343 type keepCacheBenchSuite struct {
344         blksize  int
345         blkcount int
346         backend  *keepGatewayMemoryBacked
347         cache    *DiskCache
348         locators []string
349 }
350
351 func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
352         s.blksize = 64000000
353         s.blkcount = 8
354         s.backend = &keepGatewayMemoryBacked{}
355         s.cache = &DiskCache{
356                 KeepGateway: s.backend,
357                 MaxSize:     ByteSizeOrPercent(s.blksize),
358                 Dir:         c.MkDir(),
359                 Logger:      ctxlog.TestLogger(c),
360         }
361         s.locators = make([]string, s.blkcount)
362         data := make([]byte, s.blksize)
363         for b := 0; b < s.blkcount; b++ {
364                 for i := range data {
365                         data[i] = byte(b)
366                 }
367                 resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
368                         Data: data,
369                 })
370                 c.Assert(err, check.IsNil)
371                 s.locators[b] = resp.Locator
372         }
373 }
374
375 func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
376         var wg sync.WaitGroup
377         for i := 0; i < c.N; i++ {
378                 i := i
379                 wg.Add(1)
380                 go func() {
381                         defer wg.Done()
382                         buf := make([]byte, benchReadSize)
383                         _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
384                         if err != nil {
385                                 c.Fail()
386                         }
387                 }()
388         }
389         wg.Wait()
390 }
391
392 func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
393         buf := make([]byte, benchReadSize)
394         for i := 0; i < c.N; i++ {
395                 _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
396                 if err != nil {
397                         c.Fail()
398                 }
399         }
400 }
401
402 const benchReadSize = 1000
403
404 var _ = check.Suite(&fileOpsSuite{})
405
406 type fileOpsSuite struct{}
407
408 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
409 // potential performance improvement of caching filehandles rather
410 // than opening/closing the cache file for each read.
411 //
412 // Results from a development machine indicate a ~3x throughput
413 // improvement: ~636 MB/s when opening/closing the file for each
414 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
415 // concurrent reads using the same file descriptor.
416 func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
417         fnm := c.MkDir() + "/testfile"
418         os.WriteFile(fnm, make([]byte, 64000000), 0700)
419         var wg sync.WaitGroup
420         for i := 0; i < c.N; i++ {
421                 i := i
422                 wg.Add(1)
423                 go func() {
424                         defer wg.Done()
425                         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
426                         if err != nil {
427                                 c.Fail()
428                                 return
429                         }
430                         _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
431                         if err != nil {
432                                 c.Fail()
433                                 return
434                         }
435                         f.Close()
436                 }()
437         }
438         wg.Wait()
439 }
440
441 func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
442         fnm := c.MkDir() + "/testfile"
443         os.WriteFile(fnm, make([]byte, 64000000), 0700)
444         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
445         if err != nil {
446                 c.Fail()
447                 return
448         }
449         var wg sync.WaitGroup
450         for i := 0; i < c.N; i++ {
451                 i := i
452                 wg.Add(1)
453                 go func() {
454                         defer wg.Done()
455                         _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
456                         if err != nil {
457                                 c.Fail()
458                                 return
459                         }
460                 }()
461         }
462         wg.Wait()
463         f.Close()
464 }