slightly less. Refactored tests to reduce redundancy slightly. Added test with large number of concurrent readers. Rewrote "how to use" package comments and wrote a small novel about the "theory of operation".
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
Usage:
Begin reading into a buffer with maximum size 'buffersize' from 'source':
- tr := StartTransferFromReader(buffersize, source)
+ stream := AsyncStreamFromReader(buffersize, source)
-To create a new reader (this can be called multiple times):
- r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+ reader := tr.MakeStreamReader()
-When you're done with the buffer:
- tr.Close()
+Make sure to close the reader when you're done with it.
+ reader.Close()
+When you're done with the stream:
+ stream.Close()
Alternately, if you already have a filled buffer and just want to read out from it:
- tr := StartTransferFromSlice(buf)
+ stream := AsyncStreamFromSlice(buf)
+
r := tr.MakeStreamReader()
- tr.Close()
*/
)
type AsyncStream struct {
- requests chan readRequest
+ buffer []byte
+ requests chan sliceRequest
add_reader chan bool
subtract_reader chan bool
wait_zero_readers chan bool
- Reader_status chan error
}
// Reads from the buffer managed by the Transfer()
type StreamReader struct {
offset int
stream *AsyncStream
- responses chan readResult
+ responses chan sliceResult
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- buf := make([]byte, buffersize)
-
- t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
+ t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, source, t.requests, t.Reader_status)
+ go t.transfer(source)
go t.readersMonitor()
return t
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
+ t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, nil, t.requests, nil)
+ go t.transfer(nil)
go t.readersMonitor()
return t
func (this *AsyncStream) MakeStreamReader() *StreamReader {
this.add_reader <- true
- return &StreamReader{0, this, make(chan readResult)}
+ return &StreamReader{0, this, make(chan sliceResult)}
}
// Reads from the buffer managed by the Transfer()
func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- readRequest{this.offset, len(p), this.responses}
+ this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
// Record starting offset in order to correctly report the number of bytes sent
starting_offset := this.offset
for {
- this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+ this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
close(this.add_reader)
close(this.subtract_reader)
close(this.wait_zero_readers)
- if this.Reader_status != nil {
- close(this.Reader_status)
- }
}
// Standalone tests
type StandaloneSuite struct{}
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+ ReadIntoBufferHelper(c, 225)
+ ReadIntoBufferHelper(c, 224)
+}
+
+func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+}
+
+func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 96)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 96; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 96) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+}
+
func ReadIntoBufferHelper(c *C, bufsize int) {
buffer := make([]byte, bufsize)
reader, writer := io.Pipe()
- slices := make(chan readerSlice)
+ slices := make(chan nextSlice)
go readIntoBuffer(buffer, reader, slices)
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
- }
-}
+ HelperWrite128andCheck(c, buffer, writer, slices)
+ HelperWrite96andCheck(c, buffer, writer, slices)
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 512)
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.EOF)
}
func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
buffer := make([]byte, 223)
reader, writer := io.Pipe()
- slices := make(chan readerSlice)
+ slices := make(chan nextSlice)
go readIntoBuffer(buffer, reader, slices)
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
+ HelperWrite128andCheck(c, buffer, writer, slices)
+
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
}
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
+ // Write will deadlock because it can't write all the data, so
+ // spin it off to a goroutine
+ go writer.Write(out)
+ s1 := <-slices
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
+ c.Check(len(s1.slice), Equals, 95)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 95; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
}
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 95) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
}
+ writer.Close()
+ s1 = <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
}
func (s *StandaloneSuite) TestTransfer(c *C) {
{
// Test closing the reader
writer.Close()
- status := <-tr.Reader_status
- c.Check(status, Equals, io.EOF)
in := make([]byte, 256)
n1, err1 := br1.Read(in)
func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
reader, writer := io.Pipe()
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
-
- // Read requests on Transfer() buffer
- requests := make(chan readRequest)
- defer close(requests)
-
- // Reporting reader error states
- reader_status := make(chan error)
+ tr := AsyncStreamFromReader(100, reader)
+ defer tr.Close()
- go transfer(buffer, reader, requests, reader_status)
+ sr := tr.MakeStreamReader()
+ defer sr.Close()
out := make([]byte, 101)
go writer.Write(out)
- status := <-reader_status
- c.Check(status, Equals, io.ErrShortBuffer)
+ n, err := sr.Read(out)
+ c.Check(n, Equals, 100)
+
+ n, err = sr.Read(out)
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.ErrShortBuffer)
}
func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
io.Copy(writer, br1)
}
+
+func (s *StandaloneSuite) TestManyReaders(c *C) {
+ reader, writer := io.Pipe()
+
+ tr := AsyncStreamFromReader(512, reader)
+ defer tr.Close()
+
+ sr := tr.MakeStreamReader()
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ sr.Close()
+ }()
+
+ for i := 0; i < 200; i += 1 {
+ go func() {
+ br1 := tr.MakeStreamReader()
+ defer br1.Close()
+
+ p := make([]byte, 3)
+ n, err := br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("foo"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("bar"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("baz"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.EOF)
+ }()
+ }
+
+ writer.Write([]byte("foo"))
+ writer.Write([]byte("bar"))
+ writer.Write([]byte("baz"))
+ writer.Close()
+}
+/* Internal implementation of AsyncStream.
+Outline of operation:
+
+The kernel is the transfer() goroutine. It manages concurrent reads and
+appends to the "body" slice. "body" is a slice of "source_buffer" that
+represents the segment of the buffer that is already filled in and available
+for reading.
+
+To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
+from the io.Reader source directly into source_buffer. Each read goes into a
+slice of buffer which spans the section immediately following the end of the
+current "body". Each time a Read completes, a slice representing the the
+section just filled in (or any read errors/EOF) is sent over the "slices"
+channel back to the transfer() function.
+
+Meanwhile, the transfer() function selects() on two channels, the "requests"
+channel and the "slices" channel.
+
+When a message is recieved on the "slices" channel, this means the a new
+section of the buffer has data, or an error is signaled. Since the data has
+been read directly into the source_buffer, it is able to simply increases the
+size of the body slice to encompass the newly filled in section. Then any
+pending reads are serviced with handleReadRequest (described below).
+
+When a message is recieved on the "requests" channel, it means a StreamReader
+wants access to a slice of the buffer. This is passed to handleReadRequest().
+
+The handleReadRequest() function takes a sliceRequest consisting of a buffer
+offset, maximum size, and channel to send the response. If there was an error
+reported from the source reader, it is returned. If the offset is less than
+the size of the body, the request can proceed, and it sends a body slice
+spanning the segment from offset to min(offset+maxsize, end of the body). If
+source reader status is EOF (done filling the buffer) and the read request
+offset is beyond end of the body, it responds with EOF. Otherwise, the read
+request is for a slice beyond the current size of "body" but we expect the body
+to expand as more data is added, so the request gets added to a wait list.
+
+The transfer() runs until the requests channel is closed by AsyncStream.Close()
+
+To track readers, streamer uses the readersMonitor() goroutine. This goroutine
+chooses which channels to receive from based on the number of outstanding
+readers. When a new reader is created, it sends a message on the add_reader
+channel. If the number of readers is already at MAX_READERS, this blocks the
+sender until an existing reader is closed. When a reader is closed, it sends a
+message on the subtract_reader channel. Finally, when AsyncStream.Close() is
+called, it sends a message on the wait_zero_readers channel, which will block
+the sender unless there are zero readers and it is safe to shut down the
+AsyncStream.
+*/
+
package streamer
import (
const MAX_READERS = 100
// A slice passed from readIntoBuffer() to transfer()
-type readerSlice struct {
+type nextSlice struct {
slice []byte
reader_error error
}
// A read request to the Transfer() function
-type readRequest struct {
+type sliceRequest struct {
offset int
maxsize int
- result chan<- readResult
+ result chan<- sliceResult
}
// A read result from the Transfer() function
-type readResult struct {
+type sliceResult struct {
slice []byte
err error
}
// Read repeatedly from the reader and write sequentially into the specified
// buffer, and report each read to channel 'c'. Completes when Reader 'r'
// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
+func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
defer close(slices)
if writeto, ok := r.(io.WriterTo); ok {
n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
if err != nil {
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
} else {
- slices <- readerSlice{buffer[:n], nil}
- slices <- readerSlice{nil, io.EOF}
+ slices <- nextSlice{buffer[:n], nil}
+ slices <- nextSlice{nil, io.EOF}
}
return
} else {
if n > 0 {
// Reader has more data but we have nowhere to
// put it, so we're stuffed
- slices <- readerSlice{nil, io.ErrShortBuffer}
+ slices <- nextSlice{nil, io.ErrShortBuffer}
} else {
// Return some other error (hopefully EOF)
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
}
return
}
// End on error (includes EOF)
if err != nil {
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
return
}
if n > 0 {
// Make a slice with the contents of the read
- slices <- readerSlice{ptr[:n], nil}
+ slices <- nextSlice{ptr[:n], nil}
// Adjust the scratch space slice
ptr = ptr[n:]
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
-func handleReadRequest(req readRequest, body []byte, complete bool) bool {
- if req.offset < len(body) {
+func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
+ if (reader_status != nil) && (reader_status != io.EOF) {
+ req.result <- sliceResult{nil, reader_status}
+ return true
+ } else if req.offset < len(body) {
var end int
if req.offset+req.maxsize < len(body) {
end = req.offset + req.maxsize
} else {
end = len(body)
}
- req.result <- readResult{body[req.offset:end], nil}
+ req.result <- sliceResult{body[req.offset:end], nil}
return true
- } else if complete && req.offset >= len(body) {
- req.result <- readResult{nil, io.EOF}
+ } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
+ req.result <- sliceResult{nil, io.EOF}
return true
} else {
return false
// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
// Accepts read requests on the buffer on the 'requests' channel. Completes
// when 'requests' channel is closed.
-func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
+func (this *AsyncStream) transfer(source_reader io.Reader) {
+ source_buffer := this.buffer
+ requests := this.requests
+
// currently buffered data
var body []byte
// for receiving slices from readIntoBuffer
- var slices chan readerSlice = nil
+ var slices chan nextSlice = nil
- // indicates whether the buffered data is complete
- var complete bool = false
+ // indicates the status of the underlying reader
+ var reader_status error = nil
if source_reader != nil {
// 'body' is the buffer slice representing the body content read so far
// used to communicate slices of the buffer as they are
// readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan readerSlice)
+ slices = make(chan nextSlice)
// Spin it off
go readIntoBuffer(source_buffer, source_reader, slices)
body = source_buffer[:]
// buffer is complete
- complete = true
+ reader_status = io.EOF
}
- pending_requests := make([]readRequest, 0)
+ pending_requests := make([]sliceRequest, 0)
for {
select {
case req, valid := <-requests:
// Handle a buffer read request
if valid {
- if !handleReadRequest(req, body, complete) {
+ if !handleReadRequest(req, body, reader_status) {
pending_requests = append(pending_requests, req)
}
} else {
case bk, valid := <-slices:
// Got a new slice from the reader
if valid {
- if bk.reader_error != nil {
- reader_error <- bk.reader_error
- if bk.reader_error == io.EOF {
- // EOF indicates the reader is done
- // sending, so our buffer is complete.
- complete = true
- } else {
- // some other reader error
- return
- }
- }
+ reader_status = bk.reader_error
if bk.slice != nil {
// adjust body bounds now that another slice has been read
// handle pending reads
n := 0
for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, complete) {
-
- // move the element from the
- // back of the slice to
- // position 'n', then shorten
- // the slice by one element
+ if handleReadRequest(pending_requests[n], body, reader_status) {
+ // move the element from the back of the slice to
+ // position 'n', then shorten the slice by one element
pending_requests[n] = pending_requests[len(pending_requests)-1]
pending_requests = pending_requests[0 : len(pending_requests)-1]
} else {
}
}
} else {
- if complete {
- // no more reads
- slices = nil
+ if reader_status == io.EOF {
+ // no more reads expected, so this is ok
} else {
- // reader channel closed without signaling EOF
- reader_error <- io.ErrUnexpectedEOF
- return
+ // slices channel closed without signaling EOF
+ reader_status = io.ErrUnexpectedEOF
}
+ slices = nil
}
}
}