Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / streamer / streamer_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package streamer
6
7 import (
8         . "gopkg.in/check.v1"
9         "io"
10         "testing"
11         "time"
12 )
13
14 // Gocheck boilerplate
15 func Test(t *testing.T) { TestingT(t) }
16
17 var _ = Suite(&StandaloneSuite{})
18
19 // Standalone tests
20 type StandaloneSuite struct{}
21
22 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
23         ReadIntoBufferHelper(c, 225)
24         ReadIntoBufferHelper(c, 224)
25 }
26
27 func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
28         out := make([]byte, 128)
29         for i := 0; i < 128; i += 1 {
30                 out[i] = byte(i)
31         }
32         writer.Write(out)
33         s1 := <-slices
34         c.Check(len(s1.slice), Equals, 128)
35         c.Check(s1.reader_error, Equals, nil)
36         for i := 0; i < 128; i += 1 {
37                 c.Check(s1.slice[i], Equals, byte(i))
38         }
39         for i := 0; i < len(buffer); i += 1 {
40                 if i < 128 {
41                         c.Check(buffer[i], Equals, byte(i))
42                 } else {
43                         c.Check(buffer[i], Equals, byte(0))
44                 }
45         }
46 }
47
48 func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
49         out := make([]byte, 96)
50         for i := 0; i < 96; i += 1 {
51                 out[i] = byte(i / 2)
52         }
53         writer.Write(out)
54         s1 := <-slices
55         c.Check(len(s1.slice), Equals, 96)
56         c.Check(s1.reader_error, Equals, nil)
57         for i := 0; i < 96; i += 1 {
58                 c.Check(s1.slice[i], Equals, byte(i/2))
59         }
60         for i := 0; i < len(buffer); i += 1 {
61                 if i < 128 {
62                         c.Check(buffer[i], Equals, byte(i))
63                 } else if i < (128 + 96) {
64                         c.Check(buffer[i], Equals, byte((i-128)/2))
65                 } else {
66                         c.Check(buffer[i], Equals, byte(0))
67                 }
68         }
69 }
70
71 func ReadIntoBufferHelper(c *C, bufsize int) {
72         buffer := make([]byte, bufsize)
73
74         reader, writer := io.Pipe()
75         slices := make(chan nextSlice)
76
77         go readIntoBuffer(buffer, reader, slices)
78
79         HelperWrite128andCheck(c, buffer, writer, slices)
80         HelperWrite96andCheck(c, buffer, writer, slices)
81
82         writer.Close()
83         s1 := <-slices
84         c.Check(len(s1.slice), Equals, 0)
85         c.Check(s1.reader_error, Equals, io.EOF)
86 }
87
88 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
89         buffer := make([]byte, 223)
90         reader, writer := io.Pipe()
91         slices := make(chan nextSlice)
92
93         go readIntoBuffer(buffer, reader, slices)
94
95         HelperWrite128andCheck(c, buffer, writer, slices)
96
97         out := make([]byte, 96)
98         for i := 0; i < 96; i += 1 {
99                 out[i] = byte(i / 2)
100         }
101
102         // Write will deadlock because it can't write all the data, so
103         // spin it off to a goroutine
104         go writer.Write(out)
105         s1 := <-slices
106
107         c.Check(len(s1.slice), Equals, 95)
108         c.Check(s1.reader_error, Equals, nil)
109         for i := 0; i < 95; i += 1 {
110                 c.Check(s1.slice[i], Equals, byte(i/2))
111         }
112         for i := 0; i < len(buffer); i += 1 {
113                 if i < 128 {
114                         c.Check(buffer[i], Equals, byte(i))
115                 } else if i < (128 + 95) {
116                         c.Check(buffer[i], Equals, byte((i-128)/2))
117                 } else {
118                         c.Check(buffer[i], Equals, byte(0))
119                 }
120         }
121
122         writer.Close()
123         s1 = <-slices
124         c.Check(len(s1.slice), Equals, 0)
125         c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
126 }
127
128 func (s *StandaloneSuite) TestTransfer(c *C) {
129         reader, writer := io.Pipe()
130
131         tr := AsyncStreamFromReader(512, reader)
132
133         br1 := tr.MakeStreamReader()
134         out := make([]byte, 128)
135
136         {
137                 // Write some data, and read into a buffer shorter than
138                 // available data
139                 for i := 0; i < 128; i += 1 {
140                         out[i] = byte(i)
141                 }
142
143                 writer.Write(out[:100])
144
145                 in := make([]byte, 64)
146                 n, err := br1.Read(in)
147
148                 c.Check(n, Equals, 64)
149                 c.Check(err, Equals, nil)
150
151                 for i := 0; i < 64; i += 1 {
152                         c.Check(in[i], Equals, out[i])
153                 }
154         }
155
156         {
157                 // Write some more data, and read into buffer longer than
158                 // available data
159                 in := make([]byte, 64)
160                 n, err := br1.Read(in)
161                 c.Check(n, Equals, 36)
162                 c.Check(err, Equals, nil)
163
164                 for i := 0; i < 36; i += 1 {
165                         c.Check(in[i], Equals, out[64+i])
166                 }
167
168         }
169
170         {
171                 // Test read before write
172                 type Rd struct {
173                         n   int
174                         err error
175                 }
176                 rd := make(chan Rd)
177                 in := make([]byte, 64)
178
179                 go func() {
180                         n, err := br1.Read(in)
181                         rd <- Rd{n, err}
182                 }()
183
184                 time.Sleep(100 * time.Millisecond)
185                 writer.Write(out[100:])
186
187                 got := <-rd
188
189                 c.Check(got.n, Equals, 28)
190                 c.Check(got.err, Equals, nil)
191
192                 for i := 0; i < 28; i += 1 {
193                         c.Check(in[i], Equals, out[100+i])
194                 }
195         }
196
197         br2 := tr.MakeStreamReader()
198         {
199                 // Test 'catch up' reader
200                 in := make([]byte, 256)
201                 n, err := br2.Read(in)
202
203                 c.Check(n, Equals, 128)
204                 c.Check(err, Equals, nil)
205
206                 for i := 0; i < 128; i += 1 {
207                         c.Check(in[i], Equals, out[i])
208                 }
209         }
210
211         {
212                 // Test closing the reader
213                 writer.Close()
214
215                 in := make([]byte, 256)
216                 n1, err1 := br1.Read(in)
217                 n2, err2 := br2.Read(in)
218                 c.Check(n1, Equals, 0)
219                 c.Check(err1, Equals, io.EOF)
220                 c.Check(n2, Equals, 0)
221                 c.Check(err2, Equals, io.EOF)
222         }
223
224         {
225                 // Test 'catch up' reader after closing
226                 br3 := tr.MakeStreamReader()
227                 in := make([]byte, 256)
228                 n, err := br3.Read(in)
229
230                 c.Check(n, Equals, 128)
231                 c.Check(err, Equals, nil)
232
233                 for i := 0; i < 128; i += 1 {
234                         c.Check(in[i], Equals, out[i])
235                 }
236
237                 n, err = br3.Read(in)
238
239                 c.Check(n, Equals, 0)
240                 c.Check(err, Equals, io.EOF)
241         }
242 }
243
244 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
245         reader, writer := io.Pipe()
246
247         tr := AsyncStreamFromReader(100, reader)
248         defer tr.Close()
249
250         sr := tr.MakeStreamReader()
251         defer sr.Close()
252
253         out := make([]byte, 101)
254         go writer.Write(out)
255
256         n, err := sr.Read(out)
257         c.Check(n, Equals, 100)
258         c.Check(err, IsNil)
259
260         n, err = sr.Read(out)
261         c.Check(n, Equals, 0)
262         c.Check(err, Equals, io.ErrShortBuffer)
263 }
264
265 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
266         // Buffer for reads from 'r'
267         buffer := make([]byte, 100)
268         for i := 0; i < 100; i += 1 {
269                 buffer[i] = byte(i)
270         }
271
272         tr := AsyncStreamFromSlice(buffer)
273
274         br1 := tr.MakeStreamReader()
275
276         in := make([]byte, 64)
277         {
278                 n, err := br1.Read(in)
279
280                 c.Check(n, Equals, 64)
281                 c.Check(err, Equals, nil)
282
283                 for i := 0; i < 64; i += 1 {
284                         c.Check(in[i], Equals, buffer[i])
285                 }
286         }
287         {
288                 n, err := br1.Read(in)
289
290                 c.Check(n, Equals, 36)
291                 c.Check(err, Equals, nil)
292
293                 for i := 0; i < 36; i += 1 {
294                         c.Check(in[i], Equals, buffer[64+i])
295                 }
296         }
297         {
298                 n, err := br1.Read(in)
299
300                 c.Check(n, Equals, 0)
301                 c.Check(err, Equals, io.EOF)
302         }
303 }
304
305 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
306         // Buffer for reads from 'r'
307         buffer := make([]byte, 100)
308         for i := 0; i < 100; i += 1 {
309                 buffer[i] = byte(i)
310         }
311
312         tr := AsyncStreamFromSlice(buffer)
313         defer tr.Close()
314
315         br1 := tr.MakeStreamReader()
316         defer br1.Close()
317
318         reader, writer := io.Pipe()
319
320         go func() {
321                 p := make([]byte, 100)
322                 n, err := reader.Read(p)
323                 c.Check(n, Equals, 100)
324                 c.Check(err, Equals, nil)
325                 c.Check(p, DeepEquals, buffer)
326         }()
327
328         io.Copy(writer, br1)
329 }
330
331 func (s *StandaloneSuite) TestManyReaders(c *C) {
332         reader, writer := io.Pipe()
333
334         tr := AsyncStreamFromReader(512, reader)
335         defer tr.Close()
336
337         sr := tr.MakeStreamReader()
338         go func() {
339                 time.Sleep(100 * time.Millisecond)
340                 sr.Close()
341         }()
342
343         for i := 0; i < 200; i += 1 {
344                 go func() {
345                         br1 := tr.MakeStreamReader()
346                         defer br1.Close()
347
348                         p := make([]byte, 3)
349                         n, err := br1.Read(p)
350                         c.Check(n, Equals, 3)
351                         c.Check(p[0:3], DeepEquals, []byte("foo"))
352
353                         n, err = br1.Read(p)
354                         c.Check(n, Equals, 3)
355                         c.Check(p[0:3], DeepEquals, []byte("bar"))
356
357                         n, err = br1.Read(p)
358                         c.Check(n, Equals, 3)
359                         c.Check(p[0:3], DeepEquals, []byte("baz"))
360
361                         n, err = br1.Read(p)
362                         c.Check(n, Equals, 0)
363                         c.Check(err, Equals, io.EOF)
364                 }()
365         }
366
367         writer.Write([]byte("foo"))
368         writer.Write([]byte("bar"))
369         writer.Write([]byte("baz"))
370         writer.Close()
371 }
372
373 func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
374         buffer := make([]byte, 100)
375         tr := AsyncStreamFromSlice(buffer)
376         sr := tr.MakeStreamReader()
377         c.Check(sr.Close(), IsNil)
378         c.Check(sr.Close(), Equals, ErrAlreadyClosed)
379         c.Check(tr.Close(), IsNil)
380         c.Check(tr.Close(), Equals, ErrAlreadyClosed)
381 }