func (r *reader) Read(p []byte) (int, error) {
r.b.cond.L.Lock()
- defer r.b.cond.L.Unlock()
for {
if r.b.data.Len() > r.read || len(p) == 0 {
- n := copy(p, r.b.data.Bytes()[r.read:])
+ buf := r.b.data.Bytes()
+ r.b.cond.L.Unlock()
+ n := copy(p, buf[r.read:])
r.read += n
return n, nil
}
if r.b.err != nil {
+ r.b.cond.L.Unlock()
return 0, r.b.err
}
r.b.cond.Wait()
"io"
"io/ioutil"
"math/rand"
+ "sync"
+ "sync/atomic"
"testing"
"time"
}
}
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+ s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+ s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+ var n int64
+ t0 := time.Now()
+
+ buf := make([]byte, 10000)
+ rand.Read(buf)
+ for i := 0; i < 10; i++ {
+ b := NewBuffer(nil)
+ go func() {
+ for i := 0; i < c.N; i++ {
+ b.Write(buf)
+ }
+ b.Close()
+ }()
+
+ var wg sync.WaitGroup
+ for i := 0; i < readers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+ atomic.AddInt64(&n, int64(nn))
+ }()
+ }
+ wg.Wait()
+ }
+ c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
buf, err := ioutil.ReadAll(r)
c.Check(err, check.Equals, expectError)