10211: Return an error instead of crashing if stream is closed twice.
[arvados.git] / sdk / go / streamer / streamer_test.go
1 package streamer
2
3 import (
4         . "gopkg.in/check.v1"
5         "io"
6         "testing"
7         "time"
8 )
9
10 // Gocheck boilerplate
11 func Test(t *testing.T) { TestingT(t) }
12
13 var _ = Suite(&StandaloneSuite{})
14
15 // Standalone tests
16 type StandaloneSuite struct{}
17
18 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
19         ReadIntoBufferHelper(c, 225)
20         ReadIntoBufferHelper(c, 224)
21 }
22
23 func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
24         out := make([]byte, 128)
25         for i := 0; i < 128; i += 1 {
26                 out[i] = byte(i)
27         }
28         writer.Write(out)
29         s1 := <-slices
30         c.Check(len(s1.slice), Equals, 128)
31         c.Check(s1.reader_error, Equals, nil)
32         for i := 0; i < 128; i += 1 {
33                 c.Check(s1.slice[i], Equals, byte(i))
34         }
35         for i := 0; i < len(buffer); i += 1 {
36                 if i < 128 {
37                         c.Check(buffer[i], Equals, byte(i))
38                 } else {
39                         c.Check(buffer[i], Equals, byte(0))
40                 }
41         }
42 }
43
44 func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
45         out := make([]byte, 96)
46         for i := 0; i < 96; i += 1 {
47                 out[i] = byte(i / 2)
48         }
49         writer.Write(out)
50         s1 := <-slices
51         c.Check(len(s1.slice), Equals, 96)
52         c.Check(s1.reader_error, Equals, nil)
53         for i := 0; i < 96; i += 1 {
54                 c.Check(s1.slice[i], Equals, byte(i/2))
55         }
56         for i := 0; i < len(buffer); i += 1 {
57                 if i < 128 {
58                         c.Check(buffer[i], Equals, byte(i))
59                 } else if i < (128 + 96) {
60                         c.Check(buffer[i], Equals, byte((i-128)/2))
61                 } else {
62                         c.Check(buffer[i], Equals, byte(0))
63                 }
64         }
65 }
66
67 func ReadIntoBufferHelper(c *C, bufsize int) {
68         buffer := make([]byte, bufsize)
69
70         reader, writer := io.Pipe()
71         slices := make(chan nextSlice)
72
73         go readIntoBuffer(buffer, reader, slices)
74
75         HelperWrite128andCheck(c, buffer, writer, slices)
76         HelperWrite96andCheck(c, buffer, writer, slices)
77
78         writer.Close()
79         s1 := <-slices
80         c.Check(len(s1.slice), Equals, 0)
81         c.Check(s1.reader_error, Equals, io.EOF)
82 }
83
84 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
85         buffer := make([]byte, 223)
86         reader, writer := io.Pipe()
87         slices := make(chan nextSlice)
88
89         go readIntoBuffer(buffer, reader, slices)
90
91         HelperWrite128andCheck(c, buffer, writer, slices)
92
93         out := make([]byte, 96)
94         for i := 0; i < 96; i += 1 {
95                 out[i] = byte(i / 2)
96         }
97
98         // Write will deadlock because it can't write all the data, so
99         // spin it off to a goroutine
100         go writer.Write(out)
101         s1 := <-slices
102
103         c.Check(len(s1.slice), Equals, 95)
104         c.Check(s1.reader_error, Equals, nil)
105         for i := 0; i < 95; i += 1 {
106                 c.Check(s1.slice[i], Equals, byte(i/2))
107         }
108         for i := 0; i < len(buffer); i += 1 {
109                 if i < 128 {
110                         c.Check(buffer[i], Equals, byte(i))
111                 } else if i < (128 + 95) {
112                         c.Check(buffer[i], Equals, byte((i-128)/2))
113                 } else {
114                         c.Check(buffer[i], Equals, byte(0))
115                 }
116         }
117
118         writer.Close()
119         s1 = <-slices
120         c.Check(len(s1.slice), Equals, 0)
121         c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
122 }
123
124 func (s *StandaloneSuite) TestTransfer(c *C) {
125         reader, writer := io.Pipe()
126
127         tr := AsyncStreamFromReader(512, reader)
128
129         br1 := tr.MakeStreamReader()
130         out := make([]byte, 128)
131
132         {
133                 // Write some data, and read into a buffer shorter than
134                 // available data
135                 for i := 0; i < 128; i += 1 {
136                         out[i] = byte(i)
137                 }
138
139                 writer.Write(out[:100])
140
141                 in := make([]byte, 64)
142                 n, err := br1.Read(in)
143
144                 c.Check(n, Equals, 64)
145                 c.Check(err, Equals, nil)
146
147                 for i := 0; i < 64; i += 1 {
148                         c.Check(in[i], Equals, out[i])
149                 }
150         }
151
152         {
153                 // Write some more data, and read into buffer longer than
154                 // available data
155                 in := make([]byte, 64)
156                 n, err := br1.Read(in)
157                 c.Check(n, Equals, 36)
158                 c.Check(err, Equals, nil)
159
160                 for i := 0; i < 36; i += 1 {
161                         c.Check(in[i], Equals, out[64+i])
162                 }
163
164         }
165
166         {
167                 // Test read before write
168                 type Rd struct {
169                         n   int
170                         err error
171                 }
172                 rd := make(chan Rd)
173                 in := make([]byte, 64)
174
175                 go func() {
176                         n, err := br1.Read(in)
177                         rd <- Rd{n, err}
178                 }()
179
180                 time.Sleep(100 * time.Millisecond)
181                 writer.Write(out[100:])
182
183                 got := <-rd
184
185                 c.Check(got.n, Equals, 28)
186                 c.Check(got.err, Equals, nil)
187
188                 for i := 0; i < 28; i += 1 {
189                         c.Check(in[i], Equals, out[100+i])
190                 }
191         }
192
193         br2 := tr.MakeStreamReader()
194         {
195                 // Test 'catch up' reader
196                 in := make([]byte, 256)
197                 n, err := br2.Read(in)
198
199                 c.Check(n, Equals, 128)
200                 c.Check(err, Equals, nil)
201
202                 for i := 0; i < 128; i += 1 {
203                         c.Check(in[i], Equals, out[i])
204                 }
205         }
206
207         {
208                 // Test closing the reader
209                 writer.Close()
210
211                 in := make([]byte, 256)
212                 n1, err1 := br1.Read(in)
213                 n2, err2 := br2.Read(in)
214                 c.Check(n1, Equals, 0)
215                 c.Check(err1, Equals, io.EOF)
216                 c.Check(n2, Equals, 0)
217                 c.Check(err2, Equals, io.EOF)
218         }
219
220         {
221                 // Test 'catch up' reader after closing
222                 br3 := tr.MakeStreamReader()
223                 in := make([]byte, 256)
224                 n, err := br3.Read(in)
225
226                 c.Check(n, Equals, 128)
227                 c.Check(err, Equals, nil)
228
229                 for i := 0; i < 128; i += 1 {
230                         c.Check(in[i], Equals, out[i])
231                 }
232
233                 n, err = br3.Read(in)
234
235                 c.Check(n, Equals, 0)
236                 c.Check(err, Equals, io.EOF)
237         }
238 }
239
240 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
241         reader, writer := io.Pipe()
242
243         tr := AsyncStreamFromReader(100, reader)
244         defer tr.Close()
245
246         sr := tr.MakeStreamReader()
247         defer sr.Close()
248
249         out := make([]byte, 101)
250         go writer.Write(out)
251
252         n, err := sr.Read(out)
253         c.Check(n, Equals, 100)
254         c.Check(err, IsNil)
255
256         n, err = sr.Read(out)
257         c.Check(n, Equals, 0)
258         c.Check(err, Equals, io.ErrShortBuffer)
259 }
260
261 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
262         // Buffer for reads from 'r'
263         buffer := make([]byte, 100)
264         for i := 0; i < 100; i += 1 {
265                 buffer[i] = byte(i)
266         }
267
268         tr := AsyncStreamFromSlice(buffer)
269
270         br1 := tr.MakeStreamReader()
271
272         in := make([]byte, 64)
273         {
274                 n, err := br1.Read(in)
275
276                 c.Check(n, Equals, 64)
277                 c.Check(err, Equals, nil)
278
279                 for i := 0; i < 64; i += 1 {
280                         c.Check(in[i], Equals, buffer[i])
281                 }
282         }
283         {
284                 n, err := br1.Read(in)
285
286                 c.Check(n, Equals, 36)
287                 c.Check(err, Equals, nil)
288
289                 for i := 0; i < 36; i += 1 {
290                         c.Check(in[i], Equals, buffer[64+i])
291                 }
292         }
293         {
294                 n, err := br1.Read(in)
295
296                 c.Check(n, Equals, 0)
297                 c.Check(err, Equals, io.EOF)
298         }
299 }
300
301 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
302         // Buffer for reads from 'r'
303         buffer := make([]byte, 100)
304         for i := 0; i < 100; i += 1 {
305                 buffer[i] = byte(i)
306         }
307
308         tr := AsyncStreamFromSlice(buffer)
309         defer tr.Close()
310
311         br1 := tr.MakeStreamReader()
312         defer br1.Close()
313
314         reader, writer := io.Pipe()
315
316         go func() {
317                 p := make([]byte, 100)
318                 n, err := reader.Read(p)
319                 c.Check(n, Equals, 100)
320                 c.Check(err, Equals, nil)
321                 c.Check(p, DeepEquals, buffer)
322         }()
323
324         io.Copy(writer, br1)
325 }
326
327 func (s *StandaloneSuite) TestManyReaders(c *C) {
328         reader, writer := io.Pipe()
329
330         tr := AsyncStreamFromReader(512, reader)
331         defer tr.Close()
332
333         sr := tr.MakeStreamReader()
334         go func() {
335                 time.Sleep(100 * time.Millisecond)
336                 sr.Close()
337         }()
338
339         for i := 0; i < 200; i += 1 {
340                 go func() {
341                         br1 := tr.MakeStreamReader()
342                         defer br1.Close()
343
344                         p := make([]byte, 3)
345                         n, err := br1.Read(p)
346                         c.Check(n, Equals, 3)
347                         c.Check(p[0:3], DeepEquals, []byte("foo"))
348
349                         n, err = br1.Read(p)
350                         c.Check(n, Equals, 3)
351                         c.Check(p[0:3], DeepEquals, []byte("bar"))
352
353                         n, err = br1.Read(p)
354                         c.Check(n, Equals, 3)
355                         c.Check(p[0:3], DeepEquals, []byte("baz"))
356
357                         n, err = br1.Read(p)
358                         c.Check(n, Equals, 0)
359                         c.Check(err, Equals, io.EOF)
360                 }()
361         }
362
363         writer.Write([]byte("foo"))
364         writer.Write([]byte("bar"))
365         writer.Write([]byte("baz"))
366         writer.Close()
367 }
368
369 func (s *StandaloneSuite) TestMultipleClose(c *C) {
370         buffer := make([]byte, 100)
371         tr := AsyncStreamFromSlice(buffer)
372         sr := tr.MakeStreamReader()
373         sr.Close()
374         sr.Close()
375         tr.Close()
376         tr.Close()
377 }