1 /* Internal implementation of AsyncStream.
4 The kernel is the transfer() goroutine. It manages concurrent reads and
5 appends to the "body" slice. "body" is a slice of "source_buffer" that
6 represents the segment of the buffer that is already filled in and available
9 To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
10 from the io.Reader source directly into source_buffer. Each read goes into a
11 slice of buffer which spans the section immediately following the end of the
12 current "body". Each time a Read completes, a slice representing the the
13 section just filled in (or any read errors/EOF) is sent over the "slices"
14 channel back to the transfer() function.
16 Meanwhile, the transfer() function selects() on two channels, the "requests"
17 channel and the "slices" channel.
19 When a message is recieved on the "slices" channel, this means the a new
20 section of the buffer has data, or an error is signaled. Since the data has
21 been read directly into the source_buffer, it is able to simply increases the
22 size of the body slice to encompass the newly filled in section. Then any
23 pending reads are serviced with handleReadRequest (described below).
25 When a message is recieved on the "requests" channel, it means a StreamReader
26 wants access to a slice of the buffer. This is passed to handleReadRequest().
28 The handleReadRequest() function takes a sliceRequest consisting of a buffer
29 offset, maximum size, and channel to send the response. If there was an error
30 reported from the source reader, it is returned. If the offset is less than
31 the size of the body, the request can proceed, and it sends a body slice
32 spanning the segment from offset to min(offset+maxsize, end of the body). If
33 source reader status is EOF (done filling the buffer) and the read request
34 offset is beyond end of the body, it responds with EOF. Otherwise, the read
35 request is for a slice beyond the current size of "body" but we expect the body
36 to expand as more data is added, so the request gets added to a wait list.
38 The transfer() runs until the requests channel is closed by AsyncStream.Close()
40 To track readers, streamer uses the readersMonitor() goroutine. This goroutine
41 chooses which channels to receive from based on the number of outstanding
42 readers. When a new reader is created, it sends a message on the add_reader
43 channel. If the number of readers is already at MAX_READERS, this blocks the
44 sender until an existing reader is closed. When a reader is closed, it sends a
45 message on the subtract_reader channel. Finally, when AsyncStream.Close() is
46 called, it sends a message on the wait_zero_readers channel, which will block
47 the sender unless there are zero readers and it is safe to shut down the
57 const MAX_READERS = 100
59 // A slice passed from readIntoBuffer() to transfer()
60 type nextSlice struct {
65 // A read request to the Transfer() function
66 type sliceRequest struct {
69 result chan<- sliceResult
72 // A read result from the Transfer() function
73 type sliceResult struct {
78 // Supports writing into a buffer
79 type bufferWriter struct {
84 // Copy p into this.buf, increment pointer and return number of bytes read.
85 func (this *bufferWriter) Write(p []byte) (n int, err error) {
86 n = copy(this.buf[this.ptr:], p)
91 // Read repeatedly from the reader and write sequentially into the specified
92 // buffer, and report each read to channel 'c'. Completes when Reader 'r'
93 // reports on the error channel and closes channel 'c'.
94 func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
97 if writeto, ok := r.(io.WriterTo); ok {
98 n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
100 slices <- nextSlice{nil, err}
102 slices <- nextSlice{buffer[:n], nil}
103 slices <- nextSlice{nil, io.EOF}
107 // Initially entire buffer is available
113 const readblock = 64 * 1024
114 // Read 64KiB into the next part of the buffer
115 if len(ptr) > readblock {
116 n, err = r.Read(ptr[:readblock])
121 // Ran out of buffer space, try reading one more byte
123 n, err = r.Read(b[:])
126 // Reader has more data but we have nowhere to
127 // put it, so we're stuffed
128 slices <- nextSlice{nil, io.ErrShortBuffer}
130 // Return some other error (hopefully EOF)
131 slices <- nextSlice{nil, err}
136 // End on error (includes EOF)
138 slices <- nextSlice{nil, err}
143 // Make a slice with the contents of the read
144 slices <- nextSlice{ptr[:n], nil}
146 // Adjust the scratch space slice
153 // Handle a read request. Returns true if a response was sent, and false if
154 // the request should be queued.
155 func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
156 if (reader_status != nil) && (reader_status != io.EOF) {
157 req.result <- sliceResult{nil, reader_status}
159 } else if req.offset < len(body) {
161 if req.offset+req.maxsize < len(body) {
162 end = req.offset + req.maxsize
166 req.result <- sliceResult{body[req.offset:end], nil}
168 } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
169 req.result <- sliceResult{nil, io.EOF}
176 // Mediates between reads and appends.
177 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
178 // in the provided buffer. Otherwise, use the contents of 'buffer' as is.
179 // Accepts read requests on the buffer on the 'requests' channel. Completes
180 // when 'requests' channel is closed.
181 func (this *AsyncStream) transfer(source_reader io.Reader) {
182 source_buffer := this.buffer
183 requests := this.requests
185 // currently buffered data
188 // for receiving slices from readIntoBuffer
189 var slices chan nextSlice = nil
191 // indicates the status of the underlying reader
192 var reader_status error = nil
194 if source_reader != nil {
195 // 'body' is the buffer slice representing the body content read so far
196 body = source_buffer[:0]
198 // used to communicate slices of the buffer as they are
199 // readIntoBuffer will close 'slices' when it is done with it
200 slices = make(chan nextSlice)
203 go readIntoBuffer(source_buffer, source_reader, slices)
205 // use the whole buffer
206 body = source_buffer[:]
208 // buffer is complete
209 reader_status = io.EOF
212 pending_requests := make([]sliceRequest, 0)
216 case req, valid := <-requests:
217 // Handle a buffer read request
219 if !handleReadRequest(req, body, reader_status) {
220 pending_requests = append(pending_requests, req)
223 // closed 'requests' channel indicates we're done
227 case bk, valid := <-slices:
228 // Got a new slice from the reader
230 reader_status = bk.reader_error
233 // adjust body bounds now that another slice has been read
234 body = source_buffer[0 : len(body)+len(bk.slice)]
237 // handle pending reads
239 for n < len(pending_requests) {
240 if handleReadRequest(pending_requests[n], body, reader_status) {
241 // move the element from the back of the slice to
242 // position 'n', then shorten the slice by one element
243 pending_requests[n] = pending_requests[len(pending_requests)-1]
244 pending_requests = pending_requests[0 : len(pending_requests)-1]
247 // Request wasn't handled, so keep it in the request slice
252 if reader_status == nil {
253 // slices channel closed without signaling EOF
254 reader_status = io.ErrUnexpectedEOF
262 func (this *AsyncStream) readersMonitor() {
268 case _, ok := <-this.wait_zero_readers:
270 // nothing, just implicitly unblock the sender
274 case _, ok := <-this.add_reader:
281 } else if readers > 0 && readers < MAX_READERS {
283 case _, ok := <-this.add_reader:
290 case _, ok := <-this.subtract_reader:
297 } else if readers == MAX_READERS {
298 _, ok := <-this.subtract_reader