20318: Benchmark open/close vs. shared fd.
[arvados.git] / sdk / go / arvados / keep_cache_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "os"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/sdk/go/ctxlog"
19         check "gopkg.in/check.v1"
20 )
21
22 var _ = check.Suite(&keepCacheSuite{})
23
24 type keepCacheSuite struct {
25 }
26
27 type keepGatewayBlackHole struct {
28 }
29
30 func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
31         return 0, errors.New("block not found")
32 }
33 func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
34         return 0, errors.New("block not found")
35 }
36 func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
37         return locator, nil
38 }
39 func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
40         h := md5.New()
41         var size int64
42         if opts.Reader == nil {
43                 size, _ = io.Copy(h, bytes.NewReader(opts.Data))
44         } else {
45                 size, _ = io.Copy(h, opts.Reader)
46         }
47         return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
48 }
49
50 type keepGatewayMemoryBacked struct {
51         mtx  sync.RWMutex
52         data map[string][]byte
53 }
54
55 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
56         k.mtx.RLock()
57         data := k.data[locator]
58         k.mtx.RUnlock()
59         if data == nil {
60                 return 0, errors.New("block not found: " + locator)
61         }
62         var n int
63         if len(data) > offset {
64                 n = copy(dst, data[offset:])
65         }
66         if n < len(dst) {
67                 return n, io.EOF
68         }
69         return n, nil
70 }
71 func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
72         k.mtx.RLock()
73         data := k.data[opts.Locator]
74         k.mtx.RUnlock()
75         if data == nil {
76                 return 0, errors.New("block not found: " + opts.Locator)
77         }
78         return opts.WriteTo.Write(data)
79 }
80 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
81         return locator, nil
82 }
83 func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
84         h := md5.New()
85         data := bytes.NewBuffer(nil)
86         if opts.Reader == nil {
87                 data.Write(opts.Data)
88                 h.Write(data.Bytes())
89         } else {
90                 io.Copy(io.MultiWriter(h, data), opts.Reader)
91         }
92         locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
93         k.mtx.Lock()
94         if k.data == nil {
95                 k.data = map[string][]byte{}
96         }
97         k.data[locator] = data.Bytes()
98         k.mtx.Unlock()
99         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
100 }
101
102 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
103         backend := &keepGatewayMemoryBacked{}
104         cache := DiskCache{
105                 KeepGateway: backend,
106                 MaxSize:     40000000,
107                 Dir:         c.MkDir(),
108                 Logger:      ctxlog.TestLogger(c),
109         }
110         ctx := context.Background()
111         resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
112                 Data: make([]byte, 44000000),
113         })
114         c.Check(err, check.IsNil)
115         time.Sleep(time.Millisecond)
116         resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
117                 Data: make([]byte, 32000000),
118         })
119         c.Check(err, check.IsNil)
120         delete(backend.data, resp1.Locator)
121         delete(backend.data, resp2.Locator)
122         cache.tidyHoldUntil = time.Time{}
123         cache.tidy()
124
125         n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
126         c.Check(n, check.Equals, 0)
127         c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
128
129         n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
130         c.Check(n > 0, check.Equals, true)
131         c.Check(err, check.IsNil)
132 }
133
134 func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
135         blksize := 64000000
136         backend := &keepGatewayMemoryBacked{}
137         cache := DiskCache{
138                 KeepGateway: backend,
139                 MaxSize:     int64(blksize),
140                 Dir:         c.MkDir(),
141                 Logger:      ctxlog.TestLogger(c),
142         }
143         ctx := context.Background()
144         resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
145                 Data: make([]byte, blksize),
146         })
147         c.Check(err, check.IsNil)
148         delete(backend.data, resp.Locator)
149
150         failed := false
151         var wg sync.WaitGroup
152         for offset := 0; offset < blksize; offset += 123456 {
153                 offset := offset
154                 wg.Add(1)
155                 go func() {
156                         defer wg.Done()
157                         buf := make([]byte, 654321)
158                         if offset+len(buf) > blksize {
159                                 buf = buf[:blksize-offset]
160                         }
161                         n, err := cache.ReadAt(resp.Locator, buf, offset)
162                         if failed {
163                                 // don't fill logs with subsequent errors
164                                 return
165                         }
166                         if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
167                                 failed = true
168                         }
169                         c.Assert(n, check.Equals, len(buf))
170                 }()
171         }
172         wg.Wait()
173 }
174
175 const benchReadSize = 1000
176
177 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
178 // potential performance improvement of caching filehandles rather
179 // than opening/closing the cache file for each read.
180 //
181 // Results from a development machine indicate a ~3x throughput
182 // improvement: ~636 MB/s when opening/closing the file for each
183 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
184 // concurrent reads using the same file descriptor.
185 func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
186         fnm := c.MkDir() + "/testfile"
187         os.WriteFile(fnm, make([]byte, 64000000), 0700)
188         var wg sync.WaitGroup
189         for i := 0; i < c.N; i++ {
190                 wg.Add(1)
191                 go func() {
192                         defer wg.Done()
193                         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
194                         if err != nil {
195                                 c.Fail()
196                                 return
197                         }
198                         _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
199                         if err != nil {
200                                 c.Fail()
201                                 return
202                         }
203                         f.Close()
204                 }()
205         }
206         wg.Wait()
207 }
208
209 func (s *keepCacheSuite) BenchmarkKeepOpen(c *check.C) {
210         fnm := c.MkDir() + "/testfile"
211         os.WriteFile(fnm, make([]byte, 64000000), 0700)
212         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
213         if err != nil {
214                 c.Fail()
215                 return
216         }
217         var wg sync.WaitGroup
218         for i := 0; i < c.N; i++ {
219                 i := i
220                 wg.Add(1)
221                 go func() {
222                         defer wg.Done()
223                         _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
224                         if err != nil {
225                                 c.Fail()
226                                 return
227                         }
228                 }()
229         }
230         wg.Wait()
231         f.Close()
232 }