X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/64aac6153e1819738d9d80e156572aeb9bf07f97..381926c94a3b41d04101f68ec6a33e88fc795254:/sdk/go/src/arvados.org/streamer/streamer_test.go?ds=sidebyside diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go index 54f1bc7070..853d7d3035 100644 --- a/sdk/go/src/arvados.org/streamer/streamer_test.go +++ b/sdk/go/src/arvados.org/streamer/streamer_test.go @@ -15,138 +15,118 @@ var _ = Suite(&StandaloneSuite{}) // Standalone tests type StandaloneSuite struct{} +func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { + ReadIntoBufferHelper(c, 225) + ReadIntoBufferHelper(c, 224) +} + +func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { + out := make([]byte, 128) + for i := 0; i < 128; i += 1 { + out[i] = byte(i) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 128) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 128; i += 1 { + c.Check(s1.slice[i], Equals, byte(i)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } +} + +func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { + out := make([]byte, 96) + for i := 0; i < 96; i += 1 { + out[i] = byte(i / 2) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 96) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 96; i += 1 { + c.Check(s1.slice[i], Equals, byte(i/2)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else if i < (128 + 96) { + c.Check(buffer[i], Equals, byte((i-128)/2)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } +} + func ReadIntoBufferHelper(c *C, bufsize int) { buffer := make([]byte, bufsize) reader, writer := io.Pipe() - slices := make(chan readerSlice) + slices := make(chan nextSlice) go readIntoBuffer(buffer, reader, slices) - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) - } -} + HelperWrite128andCheck(c, buffer, writer, slices) + HelperWrite96andCheck(c, buffer, writer, slices) -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 512) - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) + writer.Close() + s1 := <-slices + c.Check(len(s1.slice), Equals, 0) + c.Check(s1.reader_error, Equals, io.EOF) } func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { buffer := make([]byte, 223) reader, writer := io.Pipe() - slices := make(chan readerSlice) + slices := make(chan nextSlice) go readIntoBuffer(buffer, reader, slices) - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } + HelperWrite128andCheck(c, buffer, writer, slices) + + out := make([]byte, 96) + for i := 0; i < 96; i += 1 { + out[i] = byte(i / 2) } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices + // Write will deadlock because it can't write all the data, so + // spin it off to a goroutine + go writer.Write(out) + s1 := <-slices - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } + c.Check(len(s1.slice), Equals, 95) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 95; i += 1 { + c.Check(s1.slice[i], Equals, byte(i/2)) } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else if i < (128 + 95) { + c.Check(buffer[i], Equals, byte((i-128)/2)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } } + writer.Close() + s1 = <-slices + c.Check(len(s1.slice), Equals, 0) + c.Check(s1.reader_error, Equals, io.ErrShortBuffer) } func (s *StandaloneSuite) TestTransfer(c *C) { reader, writer := io.Pipe() - tr := StartTransferFromReader(512, reader) + tr := AsyncStreamFromReader(512, reader) - br1 := tr.MakeBufferReader() + br1 := tr.MakeStreamReader() out := make([]byte, 128) { @@ -210,7 +190,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) { } } - br2 := tr.MakeBufferReader() + br2 := tr.MakeStreamReader() { // Test 'catch up' reader in := make([]byte, 256) @@ -227,8 +207,6 @@ func (s *StandaloneSuite) TestTransfer(c *C) { { // Test closing the reader writer.Close() - status := <-tr.Reader_status - c.Check(status, Equals, io.EOF) in := make([]byte, 256) n1, err1 := br1.Read(in) @@ -241,7 +219,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) { { // Test 'catch up' reader after closing - br3 := tr.MakeBufferReader() + br3 := tr.MakeStreamReader() in := make([]byte, 256) n, err := br3.Read(in) @@ -262,23 +240,21 @@ func (s *StandaloneSuite) TestTransfer(c *C) { func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { reader, writer := io.Pipe() - // Buffer for reads from 'r' - buffer := make([]byte, 100) - - // Read requests on Transfer() buffer - requests := make(chan readRequest) - defer close(requests) + tr := AsyncStreamFromReader(100, reader) + defer tr.Close() - // Reporting reader error states - reader_status := make(chan error) - - go transfer(buffer, reader, requests, reader_status) + sr := tr.MakeStreamReader() + defer sr.Close() out := make([]byte, 101) go writer.Write(out) - status := <-reader_status - c.Check(status, Equals, io.ErrShortBuffer) + n, err := sr.Read(out) + c.Check(n, Equals, 100) + + n, err = sr.Read(out) + c.Check(n, Equals, 0) + c.Check(err, Equals, io.ErrShortBuffer) } func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { @@ -288,9 +264,9 @@ func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { buffer[i] = byte(i) } - tr := StartTransferFromSlice(buffer) + tr := AsyncStreamFromSlice(buffer) - br1 := tr.MakeBufferReader() + br1 := tr.MakeStreamReader() in := make([]byte, 64) { @@ -328,9 +304,11 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) { buffer[i] = byte(i) } - tr := StartTransferFromSlice(buffer) + tr := AsyncStreamFromSlice(buffer) + defer tr.Close() - br1 := tr.MakeBufferReader() + br1 := tr.MakeStreamReader() + defer br1.Close() reader, writer := io.Pipe() @@ -344,3 +322,45 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) { io.Copy(writer, br1) } + +func (s *StandaloneSuite) TestManyReaders(c *C) { + reader, writer := io.Pipe() + + tr := AsyncStreamFromReader(512, reader) + defer tr.Close() + + sr := tr.MakeStreamReader() + go func() { + time.Sleep(100 * time.Millisecond) + sr.Close() + }() + + for i := 0; i < 200; i += 1 { + go func() { + br1 := tr.MakeStreamReader() + defer br1.Close() + + p := make([]byte, 3) + n, err := br1.Read(p) + c.Check(n, Equals, 3) + c.Check(p[0:3], DeepEquals, []byte("foo")) + + n, err = br1.Read(p) + c.Check(n, Equals, 3) + c.Check(p[0:3], DeepEquals, []byte("bar")) + + n, err = br1.Read(p) + c.Check(n, Equals, 3) + c.Check(p[0:3], DeepEquals, []byte("baz")) + + n, err = br1.Read(p) + c.Check(n, Equals, 0) + c.Check(err, Equals, io.EOF) + }() + } + + writer.Write([]byte("foo")) + writer.Write([]byte("bar")) + writer.Write([]byte("baz")) + writer.Close() +}