1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
13 // A Buffer is an io.Writer that distributes written data
14 // asynchronously to multiple concurrent readers.
16 // NewReader() can be called at any time. In all cases, every returned
17 // io.Reader reads all data written to the Buffer.
19 // Behavior is undefined if Write is called after Close or
21 type Buffer interface {
24 // NewReader() returns an io.Reader that reads all data
25 // written to the Buffer.
28 // Close, but return the given error (instead of io.EOF) to
29 // all readers when they reach the end of the buffer.
31 // CloseWithError(nil) is equivalent to
32 // CloseWithError(io.EOF).
33 CloseWithError(error) error
39 err error // nil if there might be more writes
42 // NewBuffer creates a new Buffer using buf as its initial
43 // contents. The new Buffer takes ownership of buf, and the caller
44 // should not use buf after this call.
45 func NewBuffer(buf []byte) Buffer {
47 data: bytes.NewBuffer(buf),
48 cond: sync.Cond{L: &sync.Mutex{}},
52 func (b *buffer) Write(p []byte) (int, error) {
53 defer b.cond.Broadcast()
55 defer b.cond.L.Unlock()
59 return b.data.Write(p)
62 func (b *buffer) Close() error {
63 return b.CloseWithError(nil)
66 func (b *buffer) CloseWithError(err error) error {
67 defer b.cond.Broadcast()
69 defer b.cond.L.Unlock()
78 func (b *buffer) NewReader() io.Reader {
84 read int // # bytes already read
87 func (r *reader) Read(p []byte) (int, error) {
91 case r.read < r.b.data.Len():
92 buf := r.b.data.Bytes()
94 n := copy(p, buf[r.read:])
97 case r.b.err != nil || len(p) == 0:
98 // r.b.err != nil means we reached EOF. And
99 // even if we're not at EOF, there's no need
100 // to block if len(p)==0.