1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
14 // Gocheck boilerplate
15 func Test(t *testing.T) { TestingT(t) }
17 var _ = Suite(&StandaloneSuite{})
20 type StandaloneSuite struct{}
22 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
23 ReadIntoBufferHelper(c, 225)
24 ReadIntoBufferHelper(c, 224)
27 func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
28 out := make([]byte, 128)
29 for i := 0; i < 128; i += 1 {
34 c.Check(len(s1.slice), Equals, 128)
35 c.Check(s1.reader_error, Equals, nil)
36 for i := 0; i < 128; i += 1 {
37 c.Check(s1.slice[i], Equals, byte(i))
39 for i := 0; i < len(buffer); i += 1 {
41 c.Check(buffer[i], Equals, byte(i))
43 c.Check(buffer[i], Equals, byte(0))
48 func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
49 out := make([]byte, 96)
50 for i := 0; i < 96; i += 1 {
55 c.Check(len(s1.slice), Equals, 96)
56 c.Check(s1.reader_error, Equals, nil)
57 for i := 0; i < 96; i += 1 {
58 c.Check(s1.slice[i], Equals, byte(i/2))
60 for i := 0; i < len(buffer); i += 1 {
62 c.Check(buffer[i], Equals, byte(i))
63 } else if i < (128 + 96) {
64 c.Check(buffer[i], Equals, byte((i-128)/2))
66 c.Check(buffer[i], Equals, byte(0))
71 func ReadIntoBufferHelper(c *C, bufsize int) {
72 buffer := make([]byte, bufsize)
74 reader, writer := io.Pipe()
75 slices := make(chan nextSlice)
77 go readIntoBuffer(buffer, reader, slices)
79 HelperWrite128andCheck(c, buffer, writer, slices)
80 HelperWrite96andCheck(c, buffer, writer, slices)
84 c.Check(len(s1.slice), Equals, 0)
85 c.Check(s1.reader_error, Equals, io.EOF)
88 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
89 buffer := make([]byte, 223)
90 reader, writer := io.Pipe()
91 slices := make(chan nextSlice)
93 go readIntoBuffer(buffer, reader, slices)
95 HelperWrite128andCheck(c, buffer, writer, slices)
97 out := make([]byte, 96)
98 for i := 0; i < 96; i += 1 {
102 // Write will deadlock because it can't write all the data, so
103 // spin it off to a goroutine
107 c.Check(len(s1.slice), Equals, 95)
108 c.Check(s1.reader_error, Equals, nil)
109 for i := 0; i < 95; i += 1 {
110 c.Check(s1.slice[i], Equals, byte(i/2))
112 for i := 0; i < len(buffer); i += 1 {
114 c.Check(buffer[i], Equals, byte(i))
115 } else if i < (128 + 95) {
116 c.Check(buffer[i], Equals, byte((i-128)/2))
118 c.Check(buffer[i], Equals, byte(0))
124 c.Check(len(s1.slice), Equals, 0)
125 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
128 func (s *StandaloneSuite) TestTransfer(c *C) {
129 reader, writer := io.Pipe()
131 tr := AsyncStreamFromReader(512, reader)
133 br1 := tr.MakeStreamReader()
134 out := make([]byte, 128)
137 // Write some data, and read into a buffer shorter than
139 for i := 0; i < 128; i += 1 {
143 writer.Write(out[:100])
145 in := make([]byte, 64)
146 n, err := br1.Read(in)
148 c.Check(n, Equals, 64)
149 c.Check(err, Equals, nil)
151 for i := 0; i < 64; i += 1 {
152 c.Check(in[i], Equals, out[i])
157 // Write some more data, and read into buffer longer than
159 in := make([]byte, 64)
160 n, err := br1.Read(in)
161 c.Check(n, Equals, 36)
162 c.Check(err, Equals, nil)
164 for i := 0; i < 36; i += 1 {
165 c.Check(in[i], Equals, out[64+i])
171 // Test read before write
177 in := make([]byte, 64)
180 n, err := br1.Read(in)
184 time.Sleep(100 * time.Millisecond)
185 writer.Write(out[100:])
189 c.Check(got.n, Equals, 28)
190 c.Check(got.err, Equals, nil)
192 for i := 0; i < 28; i += 1 {
193 c.Check(in[i], Equals, out[100+i])
197 br2 := tr.MakeStreamReader()
199 // Test 'catch up' reader
200 in := make([]byte, 256)
201 n, err := br2.Read(in)
203 c.Check(n, Equals, 128)
204 c.Check(err, Equals, nil)
206 for i := 0; i < 128; i += 1 {
207 c.Check(in[i], Equals, out[i])
212 // Test closing the reader
215 in := make([]byte, 256)
216 n1, err1 := br1.Read(in)
217 n2, err2 := br2.Read(in)
218 c.Check(n1, Equals, 0)
219 c.Check(err1, Equals, io.EOF)
220 c.Check(n2, Equals, 0)
221 c.Check(err2, Equals, io.EOF)
225 // Test 'catch up' reader after closing
226 br3 := tr.MakeStreamReader()
227 in := make([]byte, 256)
228 n, err := br3.Read(in)
230 c.Check(n, Equals, 128)
231 c.Check(err, Equals, nil)
233 for i := 0; i < 128; i += 1 {
234 c.Check(in[i], Equals, out[i])
237 n, err = br3.Read(in)
239 c.Check(n, Equals, 0)
240 c.Check(err, Equals, io.EOF)
244 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
245 reader, writer := io.Pipe()
247 tr := AsyncStreamFromReader(100, reader)
250 sr := tr.MakeStreamReader()
253 out := make([]byte, 101)
256 n, err := sr.Read(out)
257 c.Check(n, Equals, 100)
260 n, err = sr.Read(out)
261 c.Check(n, Equals, 0)
262 c.Check(err, Equals, io.ErrShortBuffer)
265 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
266 // Buffer for reads from 'r'
267 buffer := make([]byte, 100)
268 for i := 0; i < 100; i += 1 {
272 tr := AsyncStreamFromSlice(buffer)
274 br1 := tr.MakeStreamReader()
276 in := make([]byte, 64)
278 n, err := br1.Read(in)
280 c.Check(n, Equals, 64)
281 c.Check(err, Equals, nil)
283 for i := 0; i < 64; i += 1 {
284 c.Check(in[i], Equals, buffer[i])
288 n, err := br1.Read(in)
290 c.Check(n, Equals, 36)
291 c.Check(err, Equals, nil)
293 for i := 0; i < 36; i += 1 {
294 c.Check(in[i], Equals, buffer[64+i])
298 n, err := br1.Read(in)
300 c.Check(n, Equals, 0)
301 c.Check(err, Equals, io.EOF)
305 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
306 // Buffer for reads from 'r'
307 buffer := make([]byte, 100)
308 for i := 0; i < 100; i += 1 {
312 tr := AsyncStreamFromSlice(buffer)
315 br1 := tr.MakeStreamReader()
318 reader, writer := io.Pipe()
321 p := make([]byte, 100)
322 n, err := reader.Read(p)
323 c.Check(n, Equals, 100)
324 c.Check(err, Equals, nil)
325 c.Check(p, DeepEquals, buffer)
331 func (s *StandaloneSuite) TestManyReaders(c *C) {
332 reader, writer := io.Pipe()
334 tr := AsyncStreamFromReader(512, reader)
337 sr := tr.MakeStreamReader()
339 time.Sleep(100 * time.Millisecond)
343 for i := 0; i < 200; i += 1 {
345 br1 := tr.MakeStreamReader()
349 n, err := br1.Read(p)
350 c.Check(n, Equals, 3)
351 c.Check(p[0:3], DeepEquals, []byte("foo"))
354 c.Check(n, Equals, 3)
355 c.Check(p[0:3], DeepEquals, []byte("bar"))
358 c.Check(n, Equals, 3)
359 c.Check(p[0:3], DeepEquals, []byte("baz"))
362 c.Check(n, Equals, 0)
363 c.Check(err, Equals, io.EOF)
367 writer.Write([]byte("foo"))
368 writer.Write([]byte("bar"))
369 writer.Write([]byte("baz"))
373 func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
374 buffer := make([]byte, 100)
375 tr := AsyncStreamFromSlice(buffer)
376 sr := tr.MakeStreamReader()
377 c.Check(sr.Close(), IsNil)
378 c.Check(sr.Close(), Equals, ErrAlreadyClosed)
379 c.Check(tr.Close(), IsNil)
380 c.Check(tr.Close(), Equals, ErrAlreadyClosed)