-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
Usage:
Begin reading into a buffer with maximum size 'buffersize' from 'source':
- tr := StartTransferFromReader(buffersize, source)
+ stream := AsyncStreamFromReader(buffersize, source)
-To create a new reader (this can be called multiple times):
- r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+ reader := tr.MakeStreamReader()
-When you're done with the buffer:
- tr.Close()
+Make sure to close the reader when you're done with it.
+ reader.Close()
+When you're done with the stream:
+ stream.Close()
Alternately, if you already have a filled buffer and just want to read out from it:
- tr := StartTransferFromSlice(buf)
+ stream := AsyncStreamFromSlice(buf)
+
r := tr.MakeStreamReader()
- tr.Close()
*/
)
type AsyncStream struct {
- requests chan readRequest
+ buffer []byte
+ requests chan sliceRequest
add_reader chan bool
subtract_reader chan bool
wait_zero_readers chan bool
- Reader_status chan error
}
// Reads from the buffer managed by the Transfer()
type StreamReader struct {
offset int
stream *AsyncStream
- responses chan readResult
+ responses chan sliceResult
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- buf := make([]byte, buffersize)
-
- t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
+ t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, source, t.requests, t.Reader_status)
+ go t.transfer(source)
go t.readersMonitor()
return t
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
+ t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, nil, t.requests, nil)
+ go t.transfer(nil)
go t.readersMonitor()
return t
func (this *AsyncStream) MakeStreamReader() *StreamReader {
this.add_reader <- true
- return &StreamReader{0, this, make(chan readResult)}
+ return &StreamReader{0, this, make(chan sliceResult)}
}
// Reads from the buffer managed by the Transfer()
func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- readRequest{this.offset, len(p), this.responses}
+ this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
// Record starting offset in order to correctly report the number of bytes sent
starting_offset := this.offset
for {
- this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+ this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
func (this *StreamReader) Close() error {
this.stream.subtract_reader <- true
close(this.responses)
+ this.stream = nil
return nil
}
close(this.add_reader)
close(this.subtract_reader)
close(this.wait_zero_readers)
- if this.Reader_status != nil {
- close(this.Reader_status)
- }
}