-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
Usage:
Begin reading into a buffer with maximum size 'buffersize' from 'source':
- tr := StartTransferFromReader(buffersize, source)
+ stream := AsyncStreamFromReader(buffersize, source)
-To create a new reader (this can be called multiple times):
- r := tr.MakeBufferReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+ reader := tr.MakeStreamReader()
-When you're done with the buffer:
- tr.Close()
+Make sure to close the reader when you're done with it.
+ reader.Close()
+When you're done with the stream:
+ stream.Close()
Alternately, if you already have a filled buffer and just want to read out from it:
- tr := StartTransferFromSlice(buf)
- r := tr.MakeBufferReader()
- tr.Close()
+ stream := AsyncStreamFromSlice(buf)
+
+ r := tr.MakeStreamReader()
*/
import (
"io"
- "log"
)
-type TransferBuffer struct {
- requests chan readRequest
- Reader_status chan error
+type AsyncStream struct {
+ buffer []byte
+ requests chan sliceRequest
+ add_reader chan bool
+ subtract_reader chan bool
+ wait_zero_readers chan bool
}
// Reads from the buffer managed by the Transfer()
-type BufferReader struct {
- offset *int
- requests chan<- readRequest
- responses chan readResult
+type StreamReader struct {
+ offset int
+ stream *AsyncStream
+ responses chan sliceResult
}
-func StartTransferFromReader(buffersize int, source io.Reader) TransferBuffer {
- buf := make([]byte, buffersize)
+func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
+ t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- t := TransferBuffer{make(chan readRequest), make(chan error)}
-
- go transfer(buf, source, t.requests, t.Reader_status)
+ go t.transfer(source)
+ go t.readersMonitor()
return t
}
-func StartTransferFromSlice(buf []byte) TransferBuffer {
- t := TransferBuffer{make(chan readRequest), nil}
+func AsyncStreamFromSlice(buf []byte) *AsyncStream {
+ t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, nil, t.requests, nil)
+ go t.transfer(nil)
+ go t.readersMonitor()
return t
}
-func (this TransferBuffer) MakeBufferReader() BufferReader {
- return BufferReader{new(int), this.requests, make(chan readResult)}
+func (this *AsyncStream) MakeStreamReader() *StreamReader {
+ this.add_reader <- true
+ return &StreamReader{0, this, make(chan sliceResult)}
}
// Reads from the buffer managed by the Transfer()
-func (this BufferReader) Read(p []byte) (n int, err error) {
- this.requests <- readRequest{*this.offset, len(p), this.responses}
+func (this *StreamReader) Read(p []byte) (n int, err error) {
+ this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
- *this.offset += len(rr.slice)
+ this.offset += len(rr.slice)
return copy(p, rr.slice), rr.err
} else {
return 0, io.ErrUnexpectedEOF
}
}
-func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
// Record starting offset in order to correctly report the number of bytes sent
- starting_offset := *this.offset
+ starting_offset := this.offset
for {
- this.requests <- readRequest{*this.offset, 32 * 1024, this.responses}
+ this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
- log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
- *this.offset += len(rr.slice)
+ this.offset += len(rr.slice)
if rr.err != nil {
if rr.err == io.EOF {
// EOF is not an error.
- return int64(*this.offset - starting_offset), nil
+ return int64(this.offset - starting_offset), nil
} else {
- return int64(*this.offset - starting_offset), rr.err
+ return int64(this.offset - starting_offset), rr.err
}
} else {
dest.Write(rr.slice)
}
} else {
- return int64(*this.offset), io.ErrUnexpectedEOF
+ return int64(this.offset), io.ErrUnexpectedEOF
}
}
}
// Close the responses channel
-func (this BufferReader) Close() error {
+func (this *StreamReader) Close() error {
+ this.stream.subtract_reader <- true
close(this.responses)
+ this.stream = nil
return nil
}
-func (this TransferBuffer) Close() {
+func (this *AsyncStream) Close() {
+ this.wait_zero_readers <- true
close(this.requests)
- if this.Reader_status != nil {
- close(this.Reader_status)
- }
+ close(this.add_reader)
+ close(this.subtract_reader)
+ close(this.wait_zero_readers)
}