16480: Merge branch 'master'
[arvados.git] / sdk / go / asyncbuf / buf_test.go
index 845853bc21af68422b69c62b4ddbf8eae50d698a..cc742a8cbe1fe028917addf245ce04198677e4e6 100644 (file)
@@ -10,6 +10,8 @@ import (
        "io"
        "io/ioutil"
        "math/rand"
+       "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -88,6 +90,53 @@ func (s *Suite) TestWriteReadCloseRead(c *check.C) {
        <-done
 }
 
+func (s *Suite) TestReadAtEOF(c *check.C) {
+       buf := make([]byte, 8)
+
+       b := NewBuffer([]byte{1, 2, 3})
+
+       r := b.NewReader()
+       n, err := r.Read(buf)
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+
+       // Reading zero bytes at EOF, but before Close(), doesn't
+       // block or error
+       done := make(chan bool)
+       go func() {
+               defer close(done)
+               n, err = r.Read(buf[:0])
+               c.Check(n, check.Equals, 0)
+               c.Check(err, check.IsNil)
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               c.Error("timeout")
+       }
+
+       b.Close()
+
+       // Reading zero bytes after Close() returns EOF
+       n, err = r.Read(buf[:0])
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+
+       // Reading from start after Close() returns 3 bytes, then EOF
+       r = b.NewReader()
+       n, err = r.Read(buf)
+       c.Check(n, check.Equals, 3)
+       if err != nil {
+               c.Check(err, check.Equals, io.EOF)
+       }
+       n, err = r.Read(buf[:0])
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       n, err = r.Read(buf)
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+}
+
 func (s *Suite) TestCloseWithError(c *check.C) {
        errFake := errors.New("it's not even a real error")
 
@@ -144,6 +193,43 @@ func (s *Suite) TestManyReaders(c *check.C) {
        }
 }
 
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+       s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+       s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+       var n int64
+       t0 := time.Now()
+
+       buf := make([]byte, 10000)
+       rand.Read(buf)
+       for i := 0; i < 10; i++ {
+               b := NewBuffer(nil)
+               go func() {
+                       for i := 0; i < c.N; i++ {
+                               b.Write(buf)
+                       }
+                       b.Close()
+               }()
+
+               var wg sync.WaitGroup
+               for i := 0; i < readers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+                               atomic.AddInt64(&n, int64(nn))
+                       }()
+               }
+               wg.Wait()
+       }
+       c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
 func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
        buf, err := ioutil.ReadAll(r)
        c.Check(err, check.Equals, expectError)