12475: Rewrite streamer -> asyncbuf.
[arvados.git] / sdk / go / streamer / streamer_test.go
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
deleted file mode 100644 (file)
index f8ddbf5..0000000
+++ /dev/null
@@ -1,381 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
-       . "gopkg.in/check.v1"
-       "io"
-       "testing"
-       "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-       ReadIntoBufferHelper(c, 225)
-       ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 128)
-       for i := 0; i < 128; i += 1 {
-               out[i] = byte(i)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 128)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 128; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 96)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 96; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 96) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
-       buffer := make([]byte, bufsize)
-
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-       HelperWrite96andCheck(c, buffer, writer, slices)
-
-       writer.Close()
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
-       buffer := make([]byte, 223)
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-
-       // Write will deadlock because it can't write all the data, so
-       // spin it off to a goroutine
-       go writer.Write(out)
-       s1 := <-slices
-
-       c.Check(len(s1.slice), Equals, 95)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 95; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 95) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-
-       writer.Close()
-       s1 = <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-
-       br1 := tr.MakeStreamReader()
-       out := make([]byte, 128)
-
-       {
-               // Write some data, and read into a buffer shorter than
-               // available data
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-
-               writer.Write(out[:100])
-
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Write some more data, and read into buffer longer than
-               // available data
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, out[64+i])
-               }
-
-       }
-
-       {
-               // Test read before write
-               type Rd struct {
-                       n   int
-                       err error
-               }
-               rd := make(chan Rd)
-               in := make([]byte, 64)
-
-               go func() {
-                       n, err := br1.Read(in)
-                       rd <- Rd{n, err}
-               }()
-
-               time.Sleep(100 * time.Millisecond)
-               writer.Write(out[100:])
-
-               got := <-rd
-
-               c.Check(got.n, Equals, 28)
-               c.Check(got.err, Equals, nil)
-
-               for i := 0; i < 28; i += 1 {
-                       c.Check(in[i], Equals, out[100+i])
-               }
-       }
-
-       br2 := tr.MakeStreamReader()
-       {
-               // Test 'catch up' reader
-               in := make([]byte, 256)
-               n, err := br2.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Test closing the reader
-               writer.Close()
-
-               in := make([]byte, 256)
-               n1, err1 := br1.Read(in)
-               n2, err2 := br2.Read(in)
-               c.Check(n1, Equals, 0)
-               c.Check(err1, Equals, io.EOF)
-               c.Check(n2, Equals, 0)
-               c.Check(err2, Equals, io.EOF)
-       }
-
-       {
-               // Test 'catch up' reader after closing
-               br3 := tr.MakeStreamReader()
-               in := make([]byte, 256)
-               n, err := br3.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-
-               n, err = br3.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(100, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       defer sr.Close()
-
-       out := make([]byte, 101)
-       go writer.Write(out)
-
-       n, err := sr.Read(out)
-       c.Check(n, Equals, 100)
-       c.Check(err, IsNil)
-
-       n, err = sr.Read(out)
-       c.Check(n, Equals, 0)
-       c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-
-       br1 := tr.MakeStreamReader()
-
-       in := make([]byte, 64)
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, buffer[i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, buffer[64+i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-       defer tr.Close()
-
-       br1 := tr.MakeStreamReader()
-       defer br1.Close()
-
-       reader, writer := io.Pipe()
-
-       go func() {
-               p := make([]byte, 100)
-               n, err := reader.Read(p)
-               c.Check(n, Equals, 100)
-               c.Check(err, Equals, nil)
-               c.Check(p, DeepEquals, buffer)
-       }()
-
-       io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       go func() {
-               time.Sleep(100 * time.Millisecond)
-               sr.Close()
-       }()
-
-       for i := 0; i < 200; i += 1 {
-               go func() {
-                       br1 := tr.MakeStreamReader()
-                       defer br1.Close()
-
-                       p := make([]byte, 3)
-                       n, err := br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("foo"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("bar"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("baz"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 0)
-                       c.Check(err, Equals, io.EOF)
-               }()
-       }
-
-       writer.Write([]byte("foo"))
-       writer.Write([]byte("bar"))
-       writer.Write([]byte("baz"))
-       writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
-       buffer := make([]byte, 100)
-       tr := AsyncStreamFromSlice(buffer)
-       sr := tr.MakeStreamReader()
-       c.Check(sr.Close(), IsNil)
-       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
-       c.Check(tr.Close(), IsNil)
-       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}