1 /* AsyncStream pulls data in from a io.Reader source (such as a file or network
2 socket) and fans out to any number of StreamReader sinks.
4 Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
5 any point in the lifetime of the AsyncStream, and each StreamReader will read
6 the contents of the buffer up to the "frontier" of the buffer, at which point
7 the StreamReader blocks until new data is read from the source.
9 This is useful for minimizing readthrough latency as sinks can read and act on
10 data from the source without waiting for the source to be completely buffered.
11 It is also useful as a cache in situations where re-reading the original source
12 potentially is costly, since the buffer retains a copy of the source data.
16 Begin reading into a buffer with maximum size 'buffersize' from 'source':
17 stream := AsyncStreamFromReader(buffersize, source)
19 To create a new reader (this can be called multiple times, each reader starts
20 at the beginning of the buffer):
21 reader := tr.MakeStreamReader()
23 Make sure to close the reader when you're done with it.
26 When you're done with the stream:
29 Alternately, if you already have a filled buffer and just want to read out from it:
30 stream := AsyncStreamFromSlice(buf)
32 r := tr.MakeStreamReader()
44 var ErrAlreadyClosed = errors.New("cannot close a stream twice")
46 type AsyncStream struct {
48 requests chan sliceRequest
50 subtract_reader chan bool
51 wait_zero_readers chan bool
55 // Reads from the buffer managed by the Transfer()
56 type StreamReader struct {
59 responses chan sliceResult
62 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
64 buffer: make([]byte, buffersize),
65 requests: make(chan sliceRequest),
66 add_reader: make(chan bool),
67 subtract_reader: make(chan bool),
68 wait_zero_readers: make(chan bool),
77 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
80 requests: make(chan sliceRequest),
81 add_reader: make(chan bool),
82 subtract_reader: make(chan bool),
83 wait_zero_readers: make(chan bool),
92 func (this *AsyncStream) MakeStreamReader() *StreamReader {
93 this.add_reader <- true
94 return &StreamReader{0, this, make(chan sliceResult)}
97 // Reads from the buffer managed by the Transfer()
98 func (this *StreamReader) Read(p []byte) (n int, err error) {
99 this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
100 rr, valid := <-this.responses
102 this.offset += len(rr.slice)
103 return copy(p, rr.slice), rr.err
105 return 0, io.ErrUnexpectedEOF
109 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
110 // Record starting offset in order to correctly report the number of bytes sent
111 starting_offset := this.offset
113 this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
114 rr, valid := <-this.responses
116 this.offset += len(rr.slice)
118 if rr.err == io.EOF {
119 // EOF is not an error.
120 return int64(this.offset - starting_offset), nil
122 return int64(this.offset - starting_offset), rr.err
128 return int64(this.offset), io.ErrUnexpectedEOF
133 // Close the responses channel
134 func (this *StreamReader) Close() error {
135 if this.stream == nil {
136 return ErrAlreadyClosed
138 this.stream.subtract_reader <- true
139 close(this.responses)
144 func (this *AsyncStream) Close() error {
146 return ErrAlreadyClosed
149 this.wait_zero_readers <- true
151 close(this.add_reader)
152 close(this.subtract_reader)
153 close(this.wait_zero_readers)
157 func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
163 want = int64(this.offset) + offset
165 want = int64(this.Len()) + offset
167 return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
170 return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
172 if want > int64(this.Len()) {
173 want = int64(this.Len())
175 this.offset = int(want)
179 func (this *StreamReader) Len() uint64 {
180 return uint64(len(this.stream.buffer))