mend
[arvados.git] / sdk / go / src / arvados.org / streamer / streamer_test.go
1 package streamer
2
3 import (
4         . "gopkg.in/check.v1"
5         "io"
6         "testing"
7         "time"
8 )
9
10 // Gocheck boilerplate
11 func Test(t *testing.T) { TestingT(t) }
12
13 var _ = Suite(&StandaloneSuite{})
14
15 // Standalone tests
16 type StandaloneSuite struct{}
17
18 func ReadIntoBufferHelper(c *C, bufsize int) {
19         buffer := make([]byte, bufsize)
20
21         reader, writer := io.Pipe()
22         slices := make(chan readerSlice)
23
24         go readIntoBuffer(buffer, reader, slices)
25
26         {
27                 out := make([]byte, 128)
28                 for i := 0; i < 128; i += 1 {
29                         out[i] = byte(i)
30                 }
31                 writer.Write(out)
32                 s1 := <-slices
33                 c.Check(len(s1.slice), Equals, 128)
34                 c.Check(s1.reader_error, Equals, nil)
35                 for i := 0; i < 128; i += 1 {
36                         c.Check(s1.slice[i], Equals, byte(i))
37                 }
38                 for i := 0; i < len(buffer); i += 1 {
39                         if i < 128 {
40                                 c.Check(buffer[i], Equals, byte(i))
41                         } else {
42                                 c.Check(buffer[i], Equals, byte(0))
43                         }
44                 }
45         }
46         {
47                 out := make([]byte, 96)
48                 for i := 0; i < 96; i += 1 {
49                         out[i] = byte(i / 2)
50                 }
51                 writer.Write(out)
52                 s1 := <-slices
53                 c.Check(len(s1.slice), Equals, 96)
54                 c.Check(s1.reader_error, Equals, nil)
55                 for i := 0; i < 96; i += 1 {
56                         c.Check(s1.slice[i], Equals, byte(i/2))
57                 }
58                 for i := 0; i < len(buffer); i += 1 {
59                         if i < 128 {
60                                 c.Check(buffer[i], Equals, byte(i))
61                         } else if i < (128 + 96) {
62                                 c.Check(buffer[i], Equals, byte((i-128)/2))
63                         } else {
64                                 c.Check(buffer[i], Equals, byte(0))
65                         }
66                 }
67         }
68         {
69                 writer.Close()
70                 s1 := <-slices
71                 c.Check(len(s1.slice), Equals, 0)
72                 c.Check(s1.reader_error, Equals, io.EOF)
73         }
74 }
75
76 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
77         ReadIntoBufferHelper(c, 512)
78         ReadIntoBufferHelper(c, 225)
79         ReadIntoBufferHelper(c, 224)
80 }
81
82 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
83         buffer := make([]byte, 223)
84         reader, writer := io.Pipe()
85         slices := make(chan readerSlice)
86
87         go readIntoBuffer(buffer, reader, slices)
88
89         {
90                 out := make([]byte, 128)
91                 for i := 0; i < 128; i += 1 {
92                         out[i] = byte(i)
93                 }
94                 writer.Write(out)
95                 s1 := <-slices
96                 c.Check(len(s1.slice), Equals, 128)
97                 c.Check(s1.reader_error, Equals, nil)
98                 for i := 0; i < 128; i += 1 {
99                         c.Check(s1.slice[i], Equals, byte(i))
100                 }
101                 for i := 0; i < len(buffer); i += 1 {
102                         if i < 128 {
103                                 c.Check(buffer[i], Equals, byte(i))
104                         } else {
105                                 c.Check(buffer[i], Equals, byte(0))
106                         }
107                 }
108         }
109         {
110                 out := make([]byte, 96)
111                 for i := 0; i < 96; i += 1 {
112                         out[i] = byte(i / 2)
113                 }
114
115                 // Write will deadlock because it can't write all the data, so
116                 // spin it off to a goroutine
117                 go writer.Write(out)
118                 s1 := <-slices
119
120                 c.Check(len(s1.slice), Equals, 95)
121                 c.Check(s1.reader_error, Equals, nil)
122                 for i := 0; i < 95; i += 1 {
123                         c.Check(s1.slice[i], Equals, byte(i/2))
124                 }
125                 for i := 0; i < len(buffer); i += 1 {
126                         if i < 128 {
127                                 c.Check(buffer[i], Equals, byte(i))
128                         } else if i < (128 + 95) {
129                                 c.Check(buffer[i], Equals, byte((i-128)/2))
130                         } else {
131                                 c.Check(buffer[i], Equals, byte(0))
132                         }
133                 }
134         }
135         {
136                 writer.Close()
137                 s1 := <-slices
138                 c.Check(len(s1.slice), Equals, 0)
139                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
140         }
141
142 }
143
144 func (s *StandaloneSuite) TestTransfer(c *C) {
145         reader, writer := io.Pipe()
146
147         tr := AsyncStreamFromReader(512, reader)
148
149         br1 := tr.MakeStreamReader()
150         out := make([]byte, 128)
151
152         {
153                 // Write some data, and read into a buffer shorter than
154                 // available data
155                 for i := 0; i < 128; i += 1 {
156                         out[i] = byte(i)
157                 }
158
159                 writer.Write(out[:100])
160
161                 in := make([]byte, 64)
162                 n, err := br1.Read(in)
163
164                 c.Check(n, Equals, 64)
165                 c.Check(err, Equals, nil)
166
167                 for i := 0; i < 64; i += 1 {
168                         c.Check(in[i], Equals, out[i])
169                 }
170         }
171
172         {
173                 // Write some more data, and read into buffer longer than
174                 // available data
175                 in := make([]byte, 64)
176                 n, err := br1.Read(in)
177                 c.Check(n, Equals, 36)
178                 c.Check(err, Equals, nil)
179
180                 for i := 0; i < 36; i += 1 {
181                         c.Check(in[i], Equals, out[64+i])
182                 }
183
184         }
185
186         {
187                 // Test read before write
188                 type Rd struct {
189                         n   int
190                         err error
191                 }
192                 rd := make(chan Rd)
193                 in := make([]byte, 64)
194
195                 go func() {
196                         n, err := br1.Read(in)
197                         rd <- Rd{n, err}
198                 }()
199
200                 time.Sleep(100 * time.Millisecond)
201                 writer.Write(out[100:])
202
203                 got := <-rd
204
205                 c.Check(got.n, Equals, 28)
206                 c.Check(got.err, Equals, nil)
207
208                 for i := 0; i < 28; i += 1 {
209                         c.Check(in[i], Equals, out[100+i])
210                 }
211         }
212
213         br2 := tr.MakeStreamReader()
214         {
215                 // Test 'catch up' reader
216                 in := make([]byte, 256)
217                 n, err := br2.Read(in)
218
219                 c.Check(n, Equals, 128)
220                 c.Check(err, Equals, nil)
221
222                 for i := 0; i < 128; i += 1 {
223                         c.Check(in[i], Equals, out[i])
224                 }
225         }
226
227         {
228                 // Test closing the reader
229                 writer.Close()
230                 status := <-tr.Reader_status
231                 c.Check(status, Equals, io.EOF)
232
233                 in := make([]byte, 256)
234                 n1, err1 := br1.Read(in)
235                 n2, err2 := br2.Read(in)
236                 c.Check(n1, Equals, 0)
237                 c.Check(err1, Equals, io.EOF)
238                 c.Check(n2, Equals, 0)
239                 c.Check(err2, Equals, io.EOF)
240         }
241
242         {
243                 // Test 'catch up' reader after closing
244                 br3 := tr.MakeStreamReader()
245                 in := make([]byte, 256)
246                 n, err := br3.Read(in)
247
248                 c.Check(n, Equals, 128)
249                 c.Check(err, Equals, nil)
250
251                 for i := 0; i < 128; i += 1 {
252                         c.Check(in[i], Equals, out[i])
253                 }
254
255                 n, err = br3.Read(in)
256
257                 c.Check(n, Equals, 0)
258                 c.Check(err, Equals, io.EOF)
259         }
260 }
261
262 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
263         reader, writer := io.Pipe()
264
265         // Buffer for reads from 'r'
266         buffer := make([]byte, 100)
267
268         // Read requests on Transfer() buffer
269         requests := make(chan readRequest)
270         defer close(requests)
271
272         // Reporting reader error states
273         reader_status := make(chan error)
274
275         go transfer(buffer, reader, requests, reader_status)
276
277         out := make([]byte, 101)
278         go writer.Write(out)
279
280         status := <-reader_status
281         c.Check(status, Equals, io.ErrShortBuffer)
282 }
283
284 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
285         // Buffer for reads from 'r'
286         buffer := make([]byte, 100)
287         for i := 0; i < 100; i += 1 {
288                 buffer[i] = byte(i)
289         }
290
291         tr := AsyncStreamFromSlice(buffer)
292
293         br1 := tr.MakeStreamReader()
294
295         in := make([]byte, 64)
296         {
297                 n, err := br1.Read(in)
298
299                 c.Check(n, Equals, 64)
300                 c.Check(err, Equals, nil)
301
302                 for i := 0; i < 64; i += 1 {
303                         c.Check(in[i], Equals, buffer[i])
304                 }
305         }
306         {
307                 n, err := br1.Read(in)
308
309                 c.Check(n, Equals, 36)
310                 c.Check(err, Equals, nil)
311
312                 for i := 0; i < 36; i += 1 {
313                         c.Check(in[i], Equals, buffer[64+i])
314                 }
315         }
316         {
317                 n, err := br1.Read(in)
318
319                 c.Check(n, Equals, 0)
320                 c.Check(err, Equals, io.EOF)
321         }
322 }
323
324 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
325         // Buffer for reads from 'r'
326         buffer := make([]byte, 100)
327         for i := 0; i < 100; i += 1 {
328                 buffer[i] = byte(i)
329         }
330
331         tr := AsyncStreamFromSlice(buffer)
332
333         br1 := tr.MakeStreamReader()
334
335         reader, writer := io.Pipe()
336
337         go func() {
338                 p := make([]byte, 100)
339                 n, err := reader.Read(p)
340                 c.Check(n, Equals, 100)
341                 c.Check(err, Equals, nil)
342                 c.Check(p, DeepEquals, buffer)
343         }()
344
345         io.Copy(writer, br1)
346 }