Fix repositories.get_all_permissions, add tests. closes #3546
[arvados.git] / sdk / go / src / arvados.org / streamer / streamer_test.go
index 54f1bc7070de58eb53b16d3c3b4b969a5ada1457..853d7d30354daddb86e602c4b580141fc18e1bdf 100644 (file)
@@ -15,138 +15,118 @@ var _ = Suite(&StandaloneSuite{})
 // Standalone tests
 type StandaloneSuite struct{}
 
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+       ReadIntoBufferHelper(c, 225)
+       ReadIntoBufferHelper(c, 224)
+}
+
+func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+       out := make([]byte, 128)
+       for i := 0; i < 128; i += 1 {
+               out[i] = byte(i)
+       }
+       writer.Write(out)
+       s1 := <-slices
+       c.Check(len(s1.slice), Equals, 128)
+       c.Check(s1.reader_error, Equals, nil)
+       for i := 0; i < 128; i += 1 {
+               c.Check(s1.slice[i], Equals, byte(i))
+       }
+       for i := 0; i < len(buffer); i += 1 {
+               if i < 128 {
+                       c.Check(buffer[i], Equals, byte(i))
+               } else {
+                       c.Check(buffer[i], Equals, byte(0))
+               }
+       }
+}
+
+func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+       out := make([]byte, 96)
+       for i := 0; i < 96; i += 1 {
+               out[i] = byte(i / 2)
+       }
+       writer.Write(out)
+       s1 := <-slices
+       c.Check(len(s1.slice), Equals, 96)
+       c.Check(s1.reader_error, Equals, nil)
+       for i := 0; i < 96; i += 1 {
+               c.Check(s1.slice[i], Equals, byte(i/2))
+       }
+       for i := 0; i < len(buffer); i += 1 {
+               if i < 128 {
+                       c.Check(buffer[i], Equals, byte(i))
+               } else if i < (128 + 96) {
+                       c.Check(buffer[i], Equals, byte((i-128)/2))
+               } else {
+                       c.Check(buffer[i], Equals, byte(0))
+               }
+       }
+}
+
 func ReadIntoBufferHelper(c *C, bufsize int) {
        buffer := make([]byte, bufsize)
 
        reader, writer := io.Pipe()
-       slices := make(chan readerSlice)
+       slices := make(chan nextSlice)
 
        go readIntoBuffer(buffer, reader, slices)
 
-       {
-               out := make([]byte, 128)
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-               writer.Write(out)
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 128)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 128; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
-       }
-       {
-               out := make([]byte, 96)
-               for i := 0; i < 96; i += 1 {
-                       out[i] = byte(i / 2)
-               }
-               writer.Write(out)
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 96)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 96; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i/2))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else if i < (128 + 96) {
-                               c.Check(buffer[i], Equals, byte((i-128)/2))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
-       }
-       {
-               writer.Close()
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 0)
-               c.Check(s1.reader_error, Equals, io.EOF)
-       }
-}
+       HelperWrite128andCheck(c, buffer, writer, slices)
+       HelperWrite96andCheck(c, buffer, writer, slices)
 
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-       ReadIntoBufferHelper(c, 512)
-       ReadIntoBufferHelper(c, 225)
-       ReadIntoBufferHelper(c, 224)
+       writer.Close()
+       s1 := <-slices
+       c.Check(len(s1.slice), Equals, 0)
+       c.Check(s1.reader_error, Equals, io.EOF)
 }
 
 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
        buffer := make([]byte, 223)
        reader, writer := io.Pipe()
-       slices := make(chan readerSlice)
+       slices := make(chan nextSlice)
 
        go readIntoBuffer(buffer, reader, slices)
 
-       {
-               out := make([]byte, 128)
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-               writer.Write(out)
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 128)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 128; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
+       HelperWrite128andCheck(c, buffer, writer, slices)
+
+       out := make([]byte, 96)
+       for i := 0; i < 96; i += 1 {
+               out[i] = byte(i / 2)
        }
-       {
-               out := make([]byte, 96)
-               for i := 0; i < 96; i += 1 {
-                       out[i] = byte(i / 2)
-               }
 
-               // Write will deadlock because it can't write all the data, so
-               // spin it off to a goroutine
-               go writer.Write(out)
-               s1 := <-slices
+       // Write will deadlock because it can't write all the data, so
+       // spin it off to a goroutine
+       go writer.Write(out)
+       s1 := <-slices
 
-               c.Check(len(s1.slice), Equals, 95)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 95; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i/2))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else if i < (128 + 95) {
-                               c.Check(buffer[i], Equals, byte((i-128)/2))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
+       c.Check(len(s1.slice), Equals, 95)
+       c.Check(s1.reader_error, Equals, nil)
+       for i := 0; i < 95; i += 1 {
+               c.Check(s1.slice[i], Equals, byte(i/2))
        }
-       {
-               writer.Close()
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 0)
-               c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+       for i := 0; i < len(buffer); i += 1 {
+               if i < 128 {
+                       c.Check(buffer[i], Equals, byte(i))
+               } else if i < (128 + 95) {
+                       c.Check(buffer[i], Equals, byte((i-128)/2))
+               } else {
+                       c.Check(buffer[i], Equals, byte(0))
+               }
        }
 
+       writer.Close()
+       s1 = <-slices
+       c.Check(len(s1.slice), Equals, 0)
+       c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
 }
 
 func (s *StandaloneSuite) TestTransfer(c *C) {
        reader, writer := io.Pipe()
 
-       tr := StartTransferFromReader(512, reader)
+       tr := AsyncStreamFromReader(512, reader)
 
-       br1 := tr.MakeBufferReader()
+       br1 := tr.MakeStreamReader()
        out := make([]byte, 128)
 
        {
@@ -210,7 +190,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
                }
        }
 
-       br2 := tr.MakeBufferReader()
+       br2 := tr.MakeStreamReader()
        {
                // Test 'catch up' reader
                in := make([]byte, 256)
@@ -227,8 +207,6 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
        {
                // Test closing the reader
                writer.Close()
-               status := <-tr.Reader_status
-               c.Check(status, Equals, io.EOF)
 
                in := make([]byte, 256)
                n1, err1 := br1.Read(in)
@@ -241,7 +219,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
 
        {
                // Test 'catch up' reader after closing
-               br3 := tr.MakeBufferReader()
+               br3 := tr.MakeStreamReader()
                in := make([]byte, 256)
                n, err := br3.Read(in)
 
@@ -262,23 +240,21 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
        reader, writer := io.Pipe()
 
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-
-       // Read requests on Transfer() buffer
-       requests := make(chan readRequest)
-       defer close(requests)
+       tr := AsyncStreamFromReader(100, reader)
+       defer tr.Close()
 
-       // Reporting reader error states
-       reader_status := make(chan error)
-
-       go transfer(buffer, reader, requests, reader_status)
+       sr := tr.MakeStreamReader()
+       defer sr.Close()
 
        out := make([]byte, 101)
        go writer.Write(out)
 
-       status := <-reader_status
-       c.Check(status, Equals, io.ErrShortBuffer)
+       n, err := sr.Read(out)
+       c.Check(n, Equals, 100)
+
+       n, err = sr.Read(out)
+       c.Check(n, Equals, 0)
+       c.Check(err, Equals, io.ErrShortBuffer)
 }
 
 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
@@ -288,9 +264,9 @@ func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
                buffer[i] = byte(i)
        }
 
-       tr := StartTransferFromSlice(buffer)
+       tr := AsyncStreamFromSlice(buffer)
 
-       br1 := tr.MakeBufferReader()
+       br1 := tr.MakeStreamReader()
 
        in := make([]byte, 64)
        {
@@ -328,9 +304,11 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
                buffer[i] = byte(i)
        }
 
-       tr := StartTransferFromSlice(buffer)
+       tr := AsyncStreamFromSlice(buffer)
+       defer tr.Close()
 
-       br1 := tr.MakeBufferReader()
+       br1 := tr.MakeStreamReader()
+       defer br1.Close()
 
        reader, writer := io.Pipe()
 
@@ -344,3 +322,45 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
 
        io.Copy(writer, br1)
 }
+
+func (s *StandaloneSuite) TestManyReaders(c *C) {
+       reader, writer := io.Pipe()
+
+       tr := AsyncStreamFromReader(512, reader)
+       defer tr.Close()
+
+       sr := tr.MakeStreamReader()
+       go func() {
+               time.Sleep(100 * time.Millisecond)
+               sr.Close()
+       }()
+
+       for i := 0; i < 200; i += 1 {
+               go func() {
+                       br1 := tr.MakeStreamReader()
+                       defer br1.Close()
+
+                       p := make([]byte, 3)
+                       n, err := br1.Read(p)
+                       c.Check(n, Equals, 3)
+                       c.Check(p[0:3], DeepEquals, []byte("foo"))
+
+                       n, err = br1.Read(p)
+                       c.Check(n, Equals, 3)
+                       c.Check(p[0:3], DeepEquals, []byte("bar"))
+
+                       n, err = br1.Read(p)
+                       c.Check(n, Equals, 3)
+                       c.Check(p[0:3], DeepEquals, []byte("baz"))
+
+                       n, err = br1.Read(p)
+                       c.Check(n, Equals, 0)
+                       c.Check(err, Equals, io.EOF)
+               }()
+       }
+
+       writer.Write([]byte("foo"))
+       writer.Write([]byte("bar"))
+       writer.Write([]byte("baz"))
+       writer.Close()
+}