X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/abc241fb83523ae5ae5905ae47210f15d7e0671c..617d783980943ac7cda84d94a5a43e06adeb838e:/sdk/go/asyncbuf/buf_test.go diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go index 845853bc21..cc742a8cbe 100644 --- a/sdk/go/asyncbuf/buf_test.go +++ b/sdk/go/asyncbuf/buf_test.go @@ -10,6 +10,8 @@ import ( "io" "io/ioutil" "math/rand" + "sync" + "sync/atomic" "testing" "time" @@ -88,6 +90,53 @@ func (s *Suite) TestWriteReadCloseRead(c *check.C) { <-done } +func (s *Suite) TestReadAtEOF(c *check.C) { + buf := make([]byte, 8) + + b := NewBuffer([]byte{1, 2, 3}) + + r := b.NewReader() + n, err := r.Read(buf) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + + // Reading zero bytes at EOF, but before Close(), doesn't + // block or error + done := make(chan bool) + go func() { + defer close(done) + n, err = r.Read(buf[:0]) + c.Check(n, check.Equals, 0) + c.Check(err, check.IsNil) + }() + select { + case <-done: + case <-time.After(time.Second): + c.Error("timeout") + } + + b.Close() + + // Reading zero bytes after Close() returns EOF + n, err = r.Read(buf[:0]) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) + + // Reading from start after Close() returns 3 bytes, then EOF + r = b.NewReader() + n, err = r.Read(buf) + c.Check(n, check.Equals, 3) + if err != nil { + c.Check(err, check.Equals, io.EOF) + } + n, err = r.Read(buf[:0]) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) + n, err = r.Read(buf) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) +} + func (s *Suite) TestCloseWithError(c *check.C) { errFake := errors.New("it's not even a real error") @@ -144,6 +193,43 @@ func (s *Suite) TestManyReaders(c *check.C) { } } +func (s *Suite) BenchmarkOneReader(c *check.C) { + s.benchmarkReaders(c, 1) +} + +func (s *Suite) BenchmarkManyReaders(c *check.C) { + s.benchmarkReaders(c, 100) +} + +func (s *Suite) benchmarkReaders(c *check.C, readers int) { + var n int64 + t0 := time.Now() + + buf := make([]byte, 10000) + rand.Read(buf) + for i := 0; i < 10; i++ { + b := NewBuffer(nil) + go func() { + for i := 0; i < c.N; i++ { + b.Write(buf) + } + b.Close() + }() + + var wg sync.WaitGroup + for i := 0; i < readers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + nn, _ := io.Copy(ioutil.Discard, b.NewReader()) + atomic.AddInt64(&n, int64(nn)) + }() + } + wg.Wait() + } + c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000) +} + func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) { buf, err := ioutil.ReadAll(r) c.Check(err, check.Equals, expectError)