7491: Add test that default replication is read from discovery document.
[arvados.git] / sdk / go / streamer / streamer_test.go
1 package streamer
2
3 import (
4         . "gopkg.in/check.v1"
5         "io"
6         "testing"
7         "time"
8 )
9
10 // Gocheck boilerplate
11 func Test(t *testing.T) { TestingT(t) }
12
13 var _ = Suite(&StandaloneSuite{})
14
15 // Standalone tests
16 type StandaloneSuite struct{}
17
18 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
19         ReadIntoBufferHelper(c, 225)
20         ReadIntoBufferHelper(c, 224)
21 }
22
23 func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
24         out := make([]byte, 128)
25         for i := 0; i < 128; i += 1 {
26                 out[i] = byte(i)
27         }
28         writer.Write(out)
29         s1 := <-slices
30         c.Check(len(s1.slice), Equals, 128)
31         c.Check(s1.reader_error, Equals, nil)
32         for i := 0; i < 128; i += 1 {
33                 c.Check(s1.slice[i], Equals, byte(i))
34         }
35         for i := 0; i < len(buffer); i += 1 {
36                 if i < 128 {
37                         c.Check(buffer[i], Equals, byte(i))
38                 } else {
39                         c.Check(buffer[i], Equals, byte(0))
40                 }
41         }
42 }
43
44 func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
45         out := make([]byte, 96)
46         for i := 0; i < 96; i += 1 {
47                 out[i] = byte(i / 2)
48         }
49         writer.Write(out)
50         s1 := <-slices
51         c.Check(len(s1.slice), Equals, 96)
52         c.Check(s1.reader_error, Equals, nil)
53         for i := 0; i < 96; i += 1 {
54                 c.Check(s1.slice[i], Equals, byte(i/2))
55         }
56         for i := 0; i < len(buffer); i += 1 {
57                 if i < 128 {
58                         c.Check(buffer[i], Equals, byte(i))
59                 } else if i < (128 + 96) {
60                         c.Check(buffer[i], Equals, byte((i-128)/2))
61                 } else {
62                         c.Check(buffer[i], Equals, byte(0))
63                 }
64         }
65 }
66
67 func ReadIntoBufferHelper(c *C, bufsize int) {
68         buffer := make([]byte, bufsize)
69
70         reader, writer := io.Pipe()
71         slices := make(chan nextSlice)
72
73         go readIntoBuffer(buffer, reader, slices)
74
75         HelperWrite128andCheck(c, buffer, writer, slices)
76         HelperWrite96andCheck(c, buffer, writer, slices)
77
78         writer.Close()
79         s1 := <-slices
80         c.Check(len(s1.slice), Equals, 0)
81         c.Check(s1.reader_error, Equals, io.EOF)
82 }
83
84 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
85         buffer := make([]byte, 223)
86         reader, writer := io.Pipe()
87         slices := make(chan nextSlice)
88
89         go readIntoBuffer(buffer, reader, slices)
90
91         HelperWrite128andCheck(c, buffer, writer, slices)
92
93         out := make([]byte, 96)
94         for i := 0; i < 96; i += 1 {
95                 out[i] = byte(i / 2)
96         }
97
98         // Write will deadlock because it can't write all the data, so
99         // spin it off to a goroutine
100         go writer.Write(out)
101         s1 := <-slices
102
103         c.Check(len(s1.slice), Equals, 95)
104         c.Check(s1.reader_error, Equals, nil)
105         for i := 0; i < 95; i += 1 {
106                 c.Check(s1.slice[i], Equals, byte(i/2))
107         }
108         for i := 0; i < len(buffer); i += 1 {
109                 if i < 128 {
110                         c.Check(buffer[i], Equals, byte(i))
111                 } else if i < (128 + 95) {
112                         c.Check(buffer[i], Equals, byte((i-128)/2))
113                 } else {
114                         c.Check(buffer[i], Equals, byte(0))
115                 }
116         }
117
118         writer.Close()
119         s1 = <-slices
120         c.Check(len(s1.slice), Equals, 0)
121         c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
122 }
123
124 func (s *StandaloneSuite) TestTransfer(c *C) {
125         reader, writer := io.Pipe()
126
127         tr := AsyncStreamFromReader(512, reader)
128
129         br1 := tr.MakeStreamReader()
130         out := make([]byte, 128)
131
132         {
133                 // Write some data, and read into a buffer shorter than
134                 // available data
135                 for i := 0; i < 128; i += 1 {
136                         out[i] = byte(i)
137                 }
138
139                 writer.Write(out[:100])
140
141                 in := make([]byte, 64)
142                 n, err := br1.Read(in)
143
144                 c.Check(n, Equals, 64)
145                 c.Check(err, Equals, nil)
146
147                 for i := 0; i < 64; i += 1 {
148                         c.Check(in[i], Equals, out[i])
149                 }
150         }
151
152         {
153                 // Write some more data, and read into buffer longer than
154                 // available data
155                 in := make([]byte, 64)
156                 n, err := br1.Read(in)
157                 c.Check(n, Equals, 36)
158                 c.Check(err, Equals, nil)
159
160                 for i := 0; i < 36; i += 1 {
161                         c.Check(in[i], Equals, out[64+i])
162                 }
163
164         }
165
166         {
167                 // Test read before write
168                 type Rd struct {
169                         n   int
170                         err error
171                 }
172                 rd := make(chan Rd)
173                 in := make([]byte, 64)
174
175                 go func() {
176                         n, err := br1.Read(in)
177                         rd <- Rd{n, err}
178                 }()
179
180                 time.Sleep(100 * time.Millisecond)
181                 writer.Write(out[100:])
182
183                 got := <-rd
184
185                 c.Check(got.n, Equals, 28)
186                 c.Check(got.err, Equals, nil)
187
188                 for i := 0; i < 28; i += 1 {
189                         c.Check(in[i], Equals, out[100+i])
190                 }
191         }
192
193         br2 := tr.MakeStreamReader()
194         {
195                 // Test 'catch up' reader
196                 in := make([]byte, 256)
197                 n, err := br2.Read(in)
198
199                 c.Check(n, Equals, 128)
200                 c.Check(err, Equals, nil)
201
202                 for i := 0; i < 128; i += 1 {
203                         c.Check(in[i], Equals, out[i])
204                 }
205         }
206
207         {
208                 // Test closing the reader
209                 writer.Close()
210
211                 in := make([]byte, 256)
212                 n1, err1 := br1.Read(in)
213                 n2, err2 := br2.Read(in)
214                 c.Check(n1, Equals, 0)
215                 c.Check(err1, Equals, io.EOF)
216                 c.Check(n2, Equals, 0)
217                 c.Check(err2, Equals, io.EOF)
218         }
219
220         {
221                 // Test 'catch up' reader after closing
222                 br3 := tr.MakeStreamReader()
223                 in := make([]byte, 256)
224                 n, err := br3.Read(in)
225
226                 c.Check(n, Equals, 128)
227                 c.Check(err, Equals, nil)
228
229                 for i := 0; i < 128; i += 1 {
230                         c.Check(in[i], Equals, out[i])
231                 }
232
233                 n, err = br3.Read(in)
234
235                 c.Check(n, Equals, 0)
236                 c.Check(err, Equals, io.EOF)
237         }
238 }
239
240 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
241         reader, writer := io.Pipe()
242
243         tr := AsyncStreamFromReader(100, reader)
244         defer tr.Close()
245
246         sr := tr.MakeStreamReader()
247         defer sr.Close()
248
249         out := make([]byte, 101)
250         go writer.Write(out)
251
252         n, err := sr.Read(out)
253         c.Check(n, Equals, 100)
254
255         n, err = sr.Read(out)
256         c.Check(n, Equals, 0)
257         c.Check(err, Equals, io.ErrShortBuffer)
258 }
259
260 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
261         // Buffer for reads from 'r'
262         buffer := make([]byte, 100)
263         for i := 0; i < 100; i += 1 {
264                 buffer[i] = byte(i)
265         }
266
267         tr := AsyncStreamFromSlice(buffer)
268
269         br1 := tr.MakeStreamReader()
270
271         in := make([]byte, 64)
272         {
273                 n, err := br1.Read(in)
274
275                 c.Check(n, Equals, 64)
276                 c.Check(err, Equals, nil)
277
278                 for i := 0; i < 64; i += 1 {
279                         c.Check(in[i], Equals, buffer[i])
280                 }
281         }
282         {
283                 n, err := br1.Read(in)
284
285                 c.Check(n, Equals, 36)
286                 c.Check(err, Equals, nil)
287
288                 for i := 0; i < 36; i += 1 {
289                         c.Check(in[i], Equals, buffer[64+i])
290                 }
291         }
292         {
293                 n, err := br1.Read(in)
294
295                 c.Check(n, Equals, 0)
296                 c.Check(err, Equals, io.EOF)
297         }
298 }
299
300 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
301         // Buffer for reads from 'r'
302         buffer := make([]byte, 100)
303         for i := 0; i < 100; i += 1 {
304                 buffer[i] = byte(i)
305         }
306
307         tr := AsyncStreamFromSlice(buffer)
308         defer tr.Close()
309
310         br1 := tr.MakeStreamReader()
311         defer br1.Close()
312
313         reader, writer := io.Pipe()
314
315         go func() {
316                 p := make([]byte, 100)
317                 n, err := reader.Read(p)
318                 c.Check(n, Equals, 100)
319                 c.Check(err, Equals, nil)
320                 c.Check(p, DeepEquals, buffer)
321         }()
322
323         io.Copy(writer, br1)
324 }
325
326 func (s *StandaloneSuite) TestManyReaders(c *C) {
327         reader, writer := io.Pipe()
328
329         tr := AsyncStreamFromReader(512, reader)
330         defer tr.Close()
331
332         sr := tr.MakeStreamReader()
333         go func() {
334                 time.Sleep(100 * time.Millisecond)
335                 sr.Close()
336         }()
337
338         for i := 0; i < 200; i += 1 {
339                 go func() {
340                         br1 := tr.MakeStreamReader()
341                         defer br1.Close()
342
343                         p := make([]byte, 3)
344                         n, err := br1.Read(p)
345                         c.Check(n, Equals, 3)
346                         c.Check(p[0:3], DeepEquals, []byte("foo"))
347
348                         n, err = br1.Read(p)
349                         c.Check(n, Equals, 3)
350                         c.Check(p[0:3], DeepEquals, []byte("bar"))
351
352                         n, err = br1.Read(p)
353                         c.Check(n, Equals, 3)
354                         c.Check(p[0:3], DeepEquals, []byte("baz"))
355
356                         n, err = br1.Read(p)
357                         c.Check(n, Equals, 0)
358                         c.Check(err, Equals, io.EOF)
359                 }()
360         }
361
362         writer.Write([]byte("foo"))
363         writer.Write([]byte("bar"))
364         writer.Write([]byte("baz"))
365         writer.Close()
366 }