15370: Fix flaky test.
[arvados.git] / sdk / go / asyncbuf / buf_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package asyncbuf
6
7 import (
8         "crypto/md5"
9         "errors"
10         "io"
11         "io/ioutil"
12         "math/rand"
13         "sync"
14         "sync/atomic"
15         "testing"
16         "time"
17
18         check "gopkg.in/check.v1"
19 )
20
21 var _ = check.Suite(&Suite{})
22
23 type Suite struct{}
24
25 func (s *Suite) TestNoWrites(c *check.C) {
26         b := NewBuffer(nil)
27         r1 := b.NewReader()
28         r2 := b.NewReader()
29         b.Close()
30         s.checkReader(c, r1, []byte{}, nil, nil)
31         s.checkReader(c, r2, []byte{}, nil, nil)
32 }
33
34 func (s *Suite) TestNoReaders(c *check.C) {
35         b := NewBuffer(nil)
36         n, err := b.Write([]byte("foobar"))
37         err2 := b.Close()
38         c.Check(n, check.Equals, 6)
39         c.Check(err, check.IsNil)
40         c.Check(err2, check.IsNil)
41 }
42
43 func (s *Suite) TestWriteReadClose(c *check.C) {
44         done := make(chan bool, 2)
45         b := NewBuffer(nil)
46         n, err := b.Write([]byte("foobar"))
47         c.Check(n, check.Equals, 6)
48         c.Check(err, check.IsNil)
49         r1 := b.NewReader()
50         r2 := b.NewReader()
51         go s.checkReader(c, r1, []byte("foobar"), nil, done)
52         go s.checkReader(c, r2, []byte("foobar"), nil, done)
53         time.Sleep(time.Millisecond)
54         c.Check(len(done), check.Equals, 0)
55         b.Close()
56         <-done
57         <-done
58 }
59
60 func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
61         done := make(chan bool, 2)
62         b := NewBuffer([]byte("baz"))
63         n, err := b.Write([]byte("waz"))
64         c.Check(n, check.Equals, 3)
65         c.Check(err, check.IsNil)
66         b.Close()
67         r1 := b.NewReader()
68         go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
69         r2 := b.NewReader()
70         go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
71         <-done
72         <-done
73 }
74
75 func (s *Suite) TestWriteReadCloseRead(c *check.C) {
76         done := make(chan bool, 1)
77         b := NewBuffer(nil)
78         r1 := b.NewReader()
79         go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
80
81         b.Write([]byte("bazwaz"))
82
83         r2 := b.NewReader()
84         r2.Read(make([]byte, 3))
85
86         b.Write([]byte("qux"))
87         b.Close()
88
89         s.checkReader(c, r2, []byte("wazqux"), nil, nil)
90         <-done
91 }
92
93 func (s *Suite) TestReadAtEOF(c *check.C) {
94         buf := make([]byte, 8)
95
96         b := NewBuffer([]byte{1, 2, 3})
97
98         r := b.NewReader()
99         n, err := r.Read(buf)
100         c.Check(n, check.Equals, 3)
101         c.Check(err, check.IsNil)
102
103         // Reading zero bytes at EOF, but before Close(), doesn't
104         // block or error
105         done := make(chan bool)
106         go func() {
107                 defer close(done)
108                 n, err = r.Read(buf[:0])
109                 c.Check(n, check.Equals, 0)
110                 c.Check(err, check.IsNil)
111         }()
112         select {
113         case <-done:
114         case <-time.After(time.Second):
115                 c.Error("timeout")
116         }
117
118         b.Close()
119
120         // Reading zero bytes after Close() returns EOF
121         n, err = r.Read(buf[:0])
122         c.Check(n, check.Equals, 0)
123         c.Check(err, check.Equals, io.EOF)
124
125         // Reading from start after Close() returns 3 bytes, then EOF
126         r = b.NewReader()
127         n, err = r.Read(buf)
128         c.Check(n, check.Equals, 3)
129         if err != nil {
130                 c.Check(err, check.Equals, io.EOF)
131         }
132         n, err = r.Read(buf[:0])
133         c.Check(n, check.Equals, 0)
134         c.Check(err, check.Equals, io.EOF)
135         n, err = r.Read(buf)
136         c.Check(n, check.Equals, 0)
137         c.Check(err, check.Equals, io.EOF)
138 }
139
140 func (s *Suite) TestCloseWithError(c *check.C) {
141         errFake := errors.New("it's not even a real error")
142
143         done := make(chan bool, 1)
144         b := NewBuffer(nil)
145         r1 := b.NewReader()
146         go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
147
148         b.Write([]byte("bazwaz"))
149
150         r2 := b.NewReader()
151         r2.Read(make([]byte, 3))
152
153         b.Write([]byte("qux"))
154         b.CloseWithError(errFake)
155
156         s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
157         <-done
158 }
159
160 // Write n*n bytes, n at a time; read them into n goroutines using
161 // varying buffer sizes; compare checksums.
162 func (s *Suite) TestManyReaders(c *check.C) {
163         const n = 256
164
165         b := NewBuffer(nil)
166
167         expectSum := make(chan []byte)
168         go func() {
169                 hash := md5.New()
170                 buf := make([]byte, n)
171                 for i := 0; i < n; i++ {
172                         time.Sleep(10 * time.Nanosecond)
173                         rand.Read(buf)
174                         b.Write(buf)
175                         hash.Write(buf)
176                 }
177                 expectSum <- hash.Sum(nil)
178                 b.Close()
179         }()
180
181         gotSum := make(chan []byte)
182         for i := 0; i < n; i++ {
183                 go func(bufSize int) {
184                         got := md5.New()
185                         io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
186                         gotSum <- got.Sum(nil)
187                 }(i + n/2)
188         }
189
190         expect := <-expectSum
191         for i := 0; i < n; i++ {
192                 c.Check(expect, check.DeepEquals, <-gotSum)
193         }
194 }
195
196 func (s *Suite) BenchmarkOneReader(c *check.C) {
197         s.benchmarkReaders(c, 1)
198 }
199
200 func (s *Suite) BenchmarkManyReaders(c *check.C) {
201         s.benchmarkReaders(c, 100)
202 }
203
204 func (s *Suite) benchmarkReaders(c *check.C, readers int) {
205         var n int64
206         t0 := time.Now()
207
208         buf := make([]byte, 10000)
209         rand.Read(buf)
210         for i := 0; i < 10; i++ {
211                 b := NewBuffer(nil)
212                 go func() {
213                         for i := 0; i < c.N; i++ {
214                                 b.Write(buf)
215                         }
216                         b.Close()
217                 }()
218
219                 var wg sync.WaitGroup
220                 for i := 0; i < readers; i++ {
221                         wg.Add(1)
222                         go func() {
223                                 defer wg.Done()
224                                 nn, _ := io.Copy(ioutil.Discard, b.NewReader())
225                                 atomic.AddInt64(&n, int64(nn))
226                         }()
227                 }
228                 wg.Wait()
229         }
230         c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
231 }
232
233 func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
234         buf, err := ioutil.ReadAll(r)
235         c.Check(err, check.Equals, expectError)
236         c.Check(buf, check.DeepEquals, expectData)
237         if done != nil {
238                 done <- true
239         }
240 }
241
242 // Gocheck boilerplate
243 func Test(t *testing.T) {
244         check.TestingT(t)
245 }