28e1e66dfc8c65ea955e0c0e79c025fa7e26736e
[arvados.git] / sdk / go / src / arvados.org / buffer / buffer_test.go
1 package buffer
2
3 import (
4         . "gopkg.in/check.v1"
5         "io"
6         "testing"
7         "time"
8 )
9
10 // Gocheck boilerplate
11 func Test(t *testing.T) { TestingT(t) }
12
13 var _ = Suite(&StandaloneSuite{})
14
15 // Standalone tests
16 type StandaloneSuite struct{}
17
18 func ReadIntoBufferHelper(c *C, bufsize int) {
19         buffer := make([]byte, bufsize)
20
21         reader, writer := io.Pipe()
22         slices := make(chan ReaderSlice)
23
24         go ReadIntoBuffer(buffer, reader, slices)
25
26         {
27                 out := make([]byte, 128)
28                 for i := 0; i < 128; i += 1 {
29                         out[i] = byte(i)
30                 }
31                 writer.Write(out)
32                 s1 := <-slices
33                 c.Check(len(s1.slice), Equals, 128)
34                 c.Check(s1.reader_error, Equals, nil)
35                 for i := 0; i < 128; i += 1 {
36                         c.Check(s1.slice[i], Equals, byte(i))
37                 }
38                 for i := 0; i < len(buffer); i += 1 {
39                         if i < 128 {
40                                 c.Check(buffer[i], Equals, byte(i))
41                         } else {
42                                 c.Check(buffer[i], Equals, byte(0))
43                         }
44                 }
45         }
46         {
47                 out := make([]byte, 96)
48                 for i := 0; i < 96; i += 1 {
49                         out[i] = byte(i / 2)
50                 }
51                 writer.Write(out)
52                 s1 := <-slices
53                 c.Check(len(s1.slice), Equals, 96)
54                 c.Check(s1.reader_error, Equals, nil)
55                 for i := 0; i < 96; i += 1 {
56                         c.Check(s1.slice[i], Equals, byte(i/2))
57                 }
58                 for i := 0; i < len(buffer); i += 1 {
59                         if i < 128 {
60                                 c.Check(buffer[i], Equals, byte(i))
61                         } else if i < (128 + 96) {
62                                 c.Check(buffer[i], Equals, byte((i-128)/2))
63                         } else {
64                                 c.Check(buffer[i], Equals, byte(0))
65                         }
66                 }
67         }
68         {
69                 writer.Close()
70                 s1 := <-slices
71                 c.Check(len(s1.slice), Equals, 0)
72                 c.Check(s1.reader_error, Equals, io.EOF)
73         }
74 }
75
76 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
77         ReadIntoBufferHelper(c, 512)
78         ReadIntoBufferHelper(c, 225)
79         ReadIntoBufferHelper(c, 224)
80 }
81
82 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
83         buffer := make([]byte, 223)
84         reader, writer := io.Pipe()
85         slices := make(chan ReaderSlice)
86
87         go ReadIntoBuffer(buffer, reader, slices)
88
89         {
90                 out := make([]byte, 128)
91                 for i := 0; i < 128; i += 1 {
92                         out[i] = byte(i)
93                 }
94                 writer.Write(out)
95                 s1 := <-slices
96                 c.Check(len(s1.slice), Equals, 128)
97                 c.Check(s1.reader_error, Equals, nil)
98                 for i := 0; i < 128; i += 1 {
99                         c.Check(s1.slice[i], Equals, byte(i))
100                 }
101                 for i := 0; i < len(buffer); i += 1 {
102                         if i < 128 {
103                                 c.Check(buffer[i], Equals, byte(i))
104                         } else {
105                                 c.Check(buffer[i], Equals, byte(0))
106                         }
107                 }
108         }
109         {
110                 out := make([]byte, 96)
111                 for i := 0; i < 96; i += 1 {
112                         out[i] = byte(i / 2)
113                 }
114
115                 // Write will deadlock because it can't write all the data, so
116                 // spin it off to a goroutine
117                 go writer.Write(out)
118                 s1 := <-slices
119
120                 c.Check(len(s1.slice), Equals, 95)
121                 c.Check(s1.reader_error, Equals, nil)
122                 for i := 0; i < 95; i += 1 {
123                         c.Check(s1.slice[i], Equals, byte(i/2))
124                 }
125                 for i := 0; i < len(buffer); i += 1 {
126                         if i < 128 {
127                                 c.Check(buffer[i], Equals, byte(i))
128                         } else if i < (128 + 95) {
129                                 c.Check(buffer[i], Equals, byte((i-128)/2))
130                         } else {
131                                 c.Check(buffer[i], Equals, byte(0))
132                         }
133                 }
134         }
135         {
136                 writer.Close()
137                 s1 := <-slices
138                 c.Check(len(s1.slice), Equals, 0)
139                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
140         }
141
142 }
143
144 func (s *StandaloneSuite) TestTransfer(c *C) {
145         reader, writer := io.Pipe()
146
147         // Buffer for reads from 'r'
148         buffer := make([]byte, 512)
149
150         // Read requests on Transfer() buffer
151         requests := make(chan ReadRequest)
152         defer close(requests)
153
154         // Reporting reader error states
155         reader_status := make(chan error)
156
157         go Transfer(buffer, reader, requests, reader_status)
158
159         br1 := MakeBufferReader(requests)
160         out := make([]byte, 128)
161
162         {
163                 // Write some data, and read into a buffer shorter than
164                 // available data
165                 for i := 0; i < 128; i += 1 {
166                         out[i] = byte(i)
167                 }
168
169                 writer.Write(out[:100])
170
171                 in := make([]byte, 64)
172                 n, err := br1.Read(in)
173
174                 c.Check(n, Equals, 64)
175                 c.Check(err, Equals, nil)
176
177                 for i := 0; i < 64; i += 1 {
178                         c.Check(in[i], Equals, out[i])
179                 }
180         }
181
182         {
183                 // Write some more data, and read into buffer longer than
184                 // available data
185                 in := make([]byte, 64)
186                 n, err := br1.Read(in)
187                 c.Check(n, Equals, 36)
188                 c.Check(err, Equals, nil)
189
190                 for i := 0; i < 36; i += 1 {
191                         c.Check(in[i], Equals, out[64+i])
192                 }
193
194         }
195
196         {
197                 // Test read before write
198                 type Rd struct {
199                         n   int
200                         err error
201                 }
202                 rd := make(chan Rd)
203                 in := make([]byte, 64)
204
205                 go func() {
206                         n, err := br1.Read(in)
207                         rd <- Rd{n, err}
208                 }()
209
210                 time.Sleep(100 * time.Millisecond)
211                 writer.Write(out[100:])
212
213                 got := <-rd
214
215                 c.Check(got.n, Equals, 28)
216                 c.Check(got.err, Equals, nil)
217
218                 for i := 0; i < 28; i += 1 {
219                         c.Check(in[i], Equals, out[100+i])
220                 }
221         }
222
223         br2 := MakeBufferReader(requests)
224         {
225                 // Test 'catch up' reader
226                 in := make([]byte, 256)
227                 n, err := br2.Read(in)
228
229                 c.Check(n, Equals, 128)
230                 c.Check(err, Equals, nil)
231
232                 for i := 0; i < 128; i += 1 {
233                         c.Check(in[i], Equals, out[i])
234                 }
235         }
236
237         {
238                 // Test closing the reader
239                 writer.Close()
240                 status := <-reader_status
241                 c.Check(status, Equals, io.EOF)
242
243                 in := make([]byte, 256)
244                 n1, err1 := br1.Read(in)
245                 n2, err2 := br2.Read(in)
246                 c.Check(n1, Equals, 0)
247                 c.Check(err1, Equals, io.EOF)
248                 c.Check(n2, Equals, 0)
249                 c.Check(err2, Equals, io.EOF)
250         }
251
252         {
253                 // Test 'catch up' reader after closing
254                 br3 := MakeBufferReader(requests)
255                 in := make([]byte, 256)
256                 n, err := br3.Read(in)
257
258                 c.Check(n, Equals, 128)
259                 c.Check(err, Equals, nil)
260
261                 for i := 0; i < 128; i += 1 {
262                         c.Check(in[i], Equals, out[i])
263                 }
264
265                 n, err = br3.Read(in)
266
267                 c.Check(n, Equals, 0)
268                 c.Check(err, Equals, io.EOF)
269         }
270 }
271
272 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
273         reader, writer := io.Pipe()
274
275         // Buffer for reads from 'r'
276         buffer := make([]byte, 100)
277
278         // Read requests on Transfer() buffer
279         requests := make(chan ReadRequest)
280         defer close(requests)
281
282         // Reporting reader error states
283         reader_status := make(chan error)
284
285         go Transfer(buffer, reader, requests, reader_status)
286
287         out := make([]byte, 101)
288         go writer.Write(out)
289
290         status := <-reader_status
291         c.Check(status, Equals, io.ErrShortBuffer)
292 }
293
294 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
295         // Buffer for reads from 'r'
296         buffer := make([]byte, 100)
297         for i := 0; i < 100; i += 1 {
298                 buffer[i] = byte(i)
299         }
300
301         // Read requests on Transfer() buffer
302         requests := make(chan ReadRequest)
303         defer close(requests)
304
305         go Transfer(buffer, nil, requests, nil)
306
307         br1 := MakeBufferReader(requests)
308
309         in := make([]byte, 64)
310         {
311                 n, err := br1.Read(in)
312
313                 c.Check(n, Equals, 64)
314                 c.Check(err, Equals, nil)
315
316                 for i := 0; i < 64; i += 1 {
317                         c.Check(in[i], Equals, buffer[i])
318                 }
319         }
320         {
321                 n, err := br1.Read(in)
322
323                 c.Check(n, Equals, 36)
324                 c.Check(err, Equals, nil)
325
326                 for i := 0; i < 36; i += 1 {
327                         c.Check(in[i], Equals, buffer[64+i])
328                 }
329         }
330         {
331                 n, err := br1.Read(in)
332
333                 c.Check(n, Equals, 0)
334                 c.Check(err, Equals, io.EOF)
335         }
336 }
337
338 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
339         // Buffer for reads from 'r'
340         buffer := make([]byte, 100)
341         for i := 0; i < 100; i += 1 {
342                 buffer[i] = byte(i)
343         }
344
345         // Read requests on Transfer() buffer
346         requests := make(chan ReadRequest)
347         defer close(requests)
348
349         go Transfer(buffer, nil, requests, nil)
350
351         br1 := MakeBufferReader(requests)
352
353         reader, writer := io.Pipe()
354
355         go func() {
356                 p := make([]byte, 100)
357                 n, err := reader.Read(p)
358                 c.Check(n, Equals, 100)
359                 c.Check(err, Equals, nil)
360                 c.Check(p, DeepEquals, buffer)
361         }()
362
363         io.Copy(writer, br1)
364 }