12475: Faster asyncbuf.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 23 Nov 2017 14:33:34 +0000 (09:33 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 23 Nov 2017 15:13:00 +0000 (10:13 -0500)
Unlock earlier so multiple readers can copy bytes concurrently.

100 readers: improves from 4.5 GB/s to 17 GB/s
1 reader: unchanged 0.75 GB/s

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/asyncbuf/buf.go
sdk/go/asyncbuf/buf_test.go

index b3b9bf221a661450002a4a3c4bfc21e4063c5d0b..ebe0301c3452fb197aab02ad30dd716fc88a7d38 100644 (file)
@@ -86,14 +86,16 @@ type reader struct {
 
 func (r *reader) Read(p []byte) (int, error) {
        r.b.cond.L.Lock()
-       defer r.b.cond.L.Unlock()
        for {
                if r.b.data.Len() > r.read || len(p) == 0 {
-                       n := copy(p, r.b.data.Bytes()[r.read:])
+                       buf := r.b.data.Bytes()
+                       r.b.cond.L.Unlock()
+                       n := copy(p, buf[r.read:])
                        r.read += n
                        return n, nil
                }
                if r.b.err != nil {
+                       r.b.cond.L.Unlock()
                        return 0, r.b.err
                }
                r.b.cond.Wait()
index 845853bc21af68422b69c62b4ddbf8eae50d698a..198ebaf512fc71a1e42f4b7e0e64a4ae0b4581d3 100644 (file)
@@ -10,6 +10,8 @@ import (
        "io"
        "io/ioutil"
        "math/rand"
+       "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -144,6 +146,43 @@ func (s *Suite) TestManyReaders(c *check.C) {
        }
 }
 
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+       s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+       s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+       var n int64
+       t0 := time.Now()
+
+       buf := make([]byte, 10000)
+       rand.Read(buf)
+       for i := 0; i < 10; i++ {
+               b := NewBuffer(nil)
+               go func() {
+                       for i := 0; i < c.N; i++ {
+                               b.Write(buf)
+                       }
+                       b.Close()
+               }()
+
+               var wg sync.WaitGroup
+               for i := 0; i < readers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+                               atomic.AddInt64(&n, int64(nn))
+                       }()
+               }
+               wg.Wait()
+       }
+       c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
 func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
        buf, err := ioutil.ReadAll(r)
        c.Check(err, check.Equals, expectError)