// Standalone tests
type StandaloneSuite struct{}
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+ ReadIntoBufferHelper(c, 225)
+ ReadIntoBufferHelper(c, 224)
+}
+
+func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+}
+
+func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 96)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 96; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 96) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+}
+
func ReadIntoBufferHelper(c *C, bufsize int) {
buffer := make([]byte, bufsize)
reader, writer := io.Pipe()
- slices := make(chan readerSlice)
+ slices := make(chan nextSlice)
go readIntoBuffer(buffer, reader, slices)
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
- }
-}
+ HelperWrite128andCheck(c, buffer, writer, slices)
+ HelperWrite96andCheck(c, buffer, writer, slices)
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 512)
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.EOF)
}
func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
buffer := make([]byte, 223)
reader, writer := io.Pipe()
- slices := make(chan readerSlice)
+ slices := make(chan nextSlice)
go readIntoBuffer(buffer, reader, slices)
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
+ HelperWrite128andCheck(c, buffer, writer, slices)
+
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
}
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
+ // Write will deadlock because it can't write all the data, so
+ // spin it off to a goroutine
+ go writer.Write(out)
+ s1 := <-slices
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
+ c.Check(len(s1.slice), Equals, 95)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 95; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
}
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 95) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
}
+ writer.Close()
+ s1 = <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
}
func (s *StandaloneSuite) TestTransfer(c *C) {
reader, writer := io.Pipe()
- tr := StartTransferFromReader(512, reader)
+ tr := AsyncStreamFromReader(512, reader)
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
out := make([]byte, 128)
{
}
}
- br2 := tr.MakeBufferReader()
+ br2 := tr.MakeStreamReader()
{
// Test 'catch up' reader
in := make([]byte, 256)
{
// Test closing the reader
writer.Close()
- status := <-tr.Reader_status
- c.Check(status, Equals, io.EOF)
in := make([]byte, 256)
n1, err1 := br1.Read(in)
{
// Test 'catch up' reader after closing
- br3 := tr.MakeBufferReader()
+ br3 := tr.MakeStreamReader()
in := make([]byte, 256)
n, err := br3.Read(in)
func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
reader, writer := io.Pipe()
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
-
- // Read requests on Transfer() buffer
- requests := make(chan readRequest)
- defer close(requests)
+ tr := AsyncStreamFromReader(100, reader)
+ defer tr.Close()
- // Reporting reader error states
- reader_status := make(chan error)
-
- go transfer(buffer, reader, requests, reader_status)
+ sr := tr.MakeStreamReader()
+ defer sr.Close()
out := make([]byte, 101)
go writer.Write(out)
- status := <-reader_status
- c.Check(status, Equals, io.ErrShortBuffer)
+ n, err := sr.Read(out)
+ c.Check(n, Equals, 100)
+
+ n, err = sr.Read(out)
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.ErrShortBuffer)
}
func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
buffer[i] = byte(i)
}
- tr := StartTransferFromSlice(buffer)
+ tr := AsyncStreamFromSlice(buffer)
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
in := make([]byte, 64)
{
buffer[i] = byte(i)
}
- tr := StartTransferFromSlice(buffer)
+ tr := AsyncStreamFromSlice(buffer)
+ defer tr.Close()
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
+ defer br1.Close()
reader, writer := io.Pipe()
io.Copy(writer, br1)
}
+
+func (s *StandaloneSuite) TestManyReaders(c *C) {
+ reader, writer := io.Pipe()
+
+ tr := AsyncStreamFromReader(512, reader)
+ defer tr.Close()
+
+ sr := tr.MakeStreamReader()
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ sr.Close()
+ }()
+
+ for i := 0; i < 200; i += 1 {
+ go func() {
+ br1 := tr.MakeStreamReader()
+ defer br1.Close()
+
+ p := make([]byte, 3)
+ n, err := br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("foo"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("bar"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("baz"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.EOF)
+ }()
+ }
+
+ writer.Write([]byte("foo"))
+ writer.Write([]byte("bar"))
+ writer.Write([]byte("baz"))
+ writer.Close()
+}