// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: Apache-2.0 package streamer import ( . "gopkg.in/check.v1" "io" "testing" "time" ) // Gocheck boilerplate func Test(t *testing.T) { TestingT(t) } var _ = Suite(&StandaloneSuite{}) // Standalone tests type StandaloneSuite struct{} func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { ReadIntoBufferHelper(c, 225) ReadIntoBufferHelper(c, 224) } func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { out := make([]byte, 128) for i := 0; i < 128; i += 1 { out[i] = byte(i) } writer.Write(out) s1 := <-slices c.Check(len(s1.slice), Equals, 128) c.Check(s1.reader_error, Equals, nil) for i := 0; i < 128; i += 1 { c.Check(s1.slice[i], Equals, byte(i)) } for i := 0; i < len(buffer); i += 1 { if i < 128 { c.Check(buffer[i], Equals, byte(i)) } else { c.Check(buffer[i], Equals, byte(0)) } } } func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { out := make([]byte, 96) for i := 0; i < 96; i += 1 { out[i] = byte(i / 2) } writer.Write(out) s1 := <-slices c.Check(len(s1.slice), Equals, 96) c.Check(s1.reader_error, Equals, nil) for i := 0; i < 96; i += 1 { c.Check(s1.slice[i], Equals, byte(i/2)) } for i := 0; i < len(buffer); i += 1 { if i < 128 { c.Check(buffer[i], Equals, byte(i)) } else if i < (128 + 96) { c.Check(buffer[i], Equals, byte((i-128)/2)) } else { c.Check(buffer[i], Equals, byte(0)) } } } func ReadIntoBufferHelper(c *C, bufsize int) { buffer := make([]byte, bufsize) reader, writer := io.Pipe() slices := make(chan nextSlice) go readIntoBuffer(buffer, reader, slices) HelperWrite128andCheck(c, buffer, writer, slices) HelperWrite96andCheck(c, buffer, writer, slices) writer.Close() s1 := <-slices c.Check(len(s1.slice), Equals, 0) c.Check(s1.reader_error, Equals, io.EOF) } func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { buffer := make([]byte, 223) reader, writer := io.Pipe() slices := make(chan nextSlice) go readIntoBuffer(buffer, reader, slices) HelperWrite128andCheck(c, buffer, writer, slices) out := make([]byte, 96) for i := 0; i < 96; i += 1 { out[i] = byte(i / 2) } // Write will deadlock because it can't write all the data, so // spin it off to a goroutine go writer.Write(out) s1 := <-slices c.Check(len(s1.slice), Equals, 95) c.Check(s1.reader_error, Equals, nil) for i := 0; i < 95; i += 1 { c.Check(s1.slice[i], Equals, byte(i/2)) } for i := 0; i < len(buffer); i += 1 { if i < 128 { c.Check(buffer[i], Equals, byte(i)) } else if i < (128 + 95) { c.Check(buffer[i], Equals, byte((i-128)/2)) } else { c.Check(buffer[i], Equals, byte(0)) } } writer.Close() s1 = <-slices c.Check(len(s1.slice), Equals, 0) c.Check(s1.reader_error, Equals, io.ErrShortBuffer) } func (s *StandaloneSuite) TestTransfer(c *C) { reader, writer := io.Pipe() tr := AsyncStreamFromReader(512, reader) br1 := tr.MakeStreamReader() out := make([]byte, 128) { // Write some data, and read into a buffer shorter than // available data for i := 0; i < 128; i += 1 { out[i] = byte(i) } writer.Write(out[:100]) in := make([]byte, 64) n, err := br1.Read(in) c.Check(n, Equals, 64) c.Check(err, Equals, nil) for i := 0; i < 64; i += 1 { c.Check(in[i], Equals, out[i]) } } { // Write some more data, and read into buffer longer than // available data in := make([]byte, 64) n, err := br1.Read(in) c.Check(n, Equals, 36) c.Check(err, Equals, nil) for i := 0; i < 36; i += 1 { c.Check(in[i], Equals, out[64+i]) } } { // Test read before write type Rd struct { n int err error } rd := make(chan Rd) in := make([]byte, 64) go func() { n, err := br1.Read(in) rd <- Rd{n, err} }() time.Sleep(100 * time.Millisecond) writer.Write(out[100:]) got := <-rd c.Check(got.n, Equals, 28) c.Check(got.err, Equals, nil) for i := 0; i < 28; i += 1 { c.Check(in[i], Equals, out[100+i]) } } br2 := tr.MakeStreamReader() { // Test 'catch up' reader in := make([]byte, 256) n, err := br2.Read(in) c.Check(n, Equals, 128) c.Check(err, Equals, nil) for i := 0; i < 128; i += 1 { c.Check(in[i], Equals, out[i]) } } { // Test closing the reader writer.Close() in := make([]byte, 256) n1, err1 := br1.Read(in) n2, err2 := br2.Read(in) c.Check(n1, Equals, 0) c.Check(err1, Equals, io.EOF) c.Check(n2, Equals, 0) c.Check(err2, Equals, io.EOF) } { // Test 'catch up' reader after closing br3 := tr.MakeStreamReader() in := make([]byte, 256) n, err := br3.Read(in) c.Check(n, Equals, 128) c.Check(err, Equals, nil) for i := 0; i < 128; i += 1 { c.Check(in[i], Equals, out[i]) } n, err = br3.Read(in) c.Check(n, Equals, 0) c.Check(err, Equals, io.EOF) } } func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { reader, writer := io.Pipe() tr := AsyncStreamFromReader(100, reader) defer tr.Close() sr := tr.MakeStreamReader() defer sr.Close() out := make([]byte, 101) go writer.Write(out) n, err := sr.Read(out) c.Check(n, Equals, 100) c.Check(err, IsNil) n, err = sr.Read(out) c.Check(n, Equals, 0) c.Check(err, Equals, io.ErrShortBuffer) } func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { // Buffer for reads from 'r' buffer := make([]byte, 100) for i := 0; i < 100; i += 1 { buffer[i] = byte(i) } tr := AsyncStreamFromSlice(buffer) br1 := tr.MakeStreamReader() in := make([]byte, 64) { n, err := br1.Read(in) c.Check(n, Equals, 64) c.Check(err, Equals, nil) for i := 0; i < 64; i += 1 { c.Check(in[i], Equals, buffer[i]) } } { n, err := br1.Read(in) c.Check(n, Equals, 36) c.Check(err, Equals, nil) for i := 0; i < 36; i += 1 { c.Check(in[i], Equals, buffer[64+i]) } } { n, err := br1.Read(in) c.Check(n, Equals, 0) c.Check(err, Equals, io.EOF) } } func (s *StandaloneSuite) TestTransferIoCopy(c *C) { // Buffer for reads from 'r' buffer := make([]byte, 100) for i := 0; i < 100; i += 1 { buffer[i] = byte(i) } tr := AsyncStreamFromSlice(buffer) defer tr.Close() br1 := tr.MakeStreamReader() defer br1.Close() reader, writer := io.Pipe() go func() { p := make([]byte, 100) n, err := reader.Read(p) c.Check(n, Equals, 100) c.Check(err, Equals, nil) c.Check(p, DeepEquals, buffer) }() io.Copy(writer, br1) } func (s *StandaloneSuite) TestManyReaders(c *C) { reader, writer := io.Pipe() tr := AsyncStreamFromReader(512, reader) defer tr.Close() sr := tr.MakeStreamReader() go func() { time.Sleep(100 * time.Millisecond) sr.Close() }() for i := 0; i < 200; i += 1 { go func() { br1 := tr.MakeStreamReader() defer br1.Close() p := make([]byte, 3) n, err := br1.Read(p) c.Check(n, Equals, 3) c.Check(p[0:3], DeepEquals, []byte("foo")) n, err = br1.Read(p) c.Check(n, Equals, 3) c.Check(p[0:3], DeepEquals, []byte("bar")) n, err = br1.Read(p) c.Check(n, Equals, 3) c.Check(p[0:3], DeepEquals, []byte("baz")) n, err = br1.Read(p) c.Check(n, Equals, 0) c.Check(err, Equals, io.EOF) }() } writer.Write([]byte("foo")) writer.Write([]byte("bar")) writer.Write([]byte("baz")) writer.Close() } func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) { buffer := make([]byte, 100) tr := AsyncStreamFromSlice(buffer) sr := tr.MakeStreamReader() c.Check(sr.Close(), IsNil) c.Check(sr.Close(), Equals, ErrAlreadyClosed) c.Check(tr.Close(), IsNil) c.Check(tr.Close(), Equals, ErrAlreadyClosed) }