From 08fe6b0770ad8b4aa5115052126f1e0d51dca1fa Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 23 Nov 2017 09:33:34 -0500 Subject: [PATCH] 12475: Faster asyncbuf. Unlock earlier so multiple readers can copy bytes concurrently. 100 readers: improves from 4.5 GB/s to 17 GB/s 1 reader: unchanged 0.75 GB/s Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/asyncbuf/buf.go | 6 ++++-- sdk/go/asyncbuf/buf_test.go | 39 +++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go index b3b9bf221a..ebe0301c34 100644 --- a/sdk/go/asyncbuf/buf.go +++ b/sdk/go/asyncbuf/buf.go @@ -86,14 +86,16 @@ type reader struct { func (r *reader) Read(p []byte) (int, error) { r.b.cond.L.Lock() - defer r.b.cond.L.Unlock() for { if r.b.data.Len() > r.read || len(p) == 0 { - n := copy(p, r.b.data.Bytes()[r.read:]) + buf := r.b.data.Bytes() + r.b.cond.L.Unlock() + n := copy(p, buf[r.read:]) r.read += n return n, nil } if r.b.err != nil { + r.b.cond.L.Unlock() return 0, r.b.err } r.b.cond.Wait() diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go index 845853bc21..198ebaf512 100644 --- a/sdk/go/asyncbuf/buf_test.go +++ b/sdk/go/asyncbuf/buf_test.go @@ -10,6 +10,8 @@ import ( "io" "io/ioutil" "math/rand" + "sync" + "sync/atomic" "testing" "time" @@ -144,6 +146,43 @@ func (s *Suite) TestManyReaders(c *check.C) { } } +func (s *Suite) BenchmarkOneReader(c *check.C) { + s.benchmarkReaders(c, 1) +} + +func (s *Suite) BenchmarkManyReaders(c *check.C) { + s.benchmarkReaders(c, 100) +} + +func (s *Suite) benchmarkReaders(c *check.C, readers int) { + var n int64 + t0 := time.Now() + + buf := make([]byte, 10000) + rand.Read(buf) + for i := 0; i < 10; i++ { + b := NewBuffer(nil) + go func() { + for i := 0; i < c.N; i++ { + b.Write(buf) + } + b.Close() + }() + + var wg sync.WaitGroup + for i := 0; i < readers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + nn, _ := io.Copy(ioutil.Discard, b.NewReader()) + atomic.AddInt64(&n, int64(nn)) + }() + } + wg.Wait() + } + c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000) +} + func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) { buf, err := ioutil.ReadAll(r) c.Check(err, check.Equals, expectError) -- 2.39.5