+/* Internal implementation of AsyncStream.
+Outline of operation:
+
+The kernel is the transfer() goroutine. It manages concurrent reads and
+appends to the "body" slice. "body" is a slice of "source_buffer" that
+represents the segment of the buffer that is already filled in and available
+for reading.
+
+To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
+from the io.Reader source directly into source_buffer. Each read goes into a
+slice of buffer which spans the section immediately following the end of the
+current "body". Each time a Read completes, a slice representing the the
+section just filled in (or any read errors/EOF) is sent over the "slices"
+channel back to the transfer() function.
+
+Meanwhile, the transfer() function selects() on two channels, the "requests"
+channel and the "slices" channel.
+
+When a message is recieved on the "slices" channel, this means the a new
+section of the buffer has data, or an error is signaled. Since the data has
+been read directly into the source_buffer, it is able to simply increases the
+size of the body slice to encompass the newly filled in section. Then any
+pending reads are serviced with handleReadRequest (described below).
+
+When a message is recieved on the "requests" channel, it means a StreamReader
+wants access to a slice of the buffer. This is passed to handleReadRequest().
+
+The handleReadRequest() function takes a sliceRequest consisting of a buffer
+offset, maximum size, and channel to send the response. If there was an error
+reported from the source reader, it is returned. If the offset is less than
+the size of the body, the request can proceed, and it sends a body slice
+spanning the segment from offset to min(offset+maxsize, end of the body). If
+source reader status is EOF (done filling the buffer) and the read request
+offset is beyond end of the body, it responds with EOF. Otherwise, the read
+request is for a slice beyond the current size of "body" but we expect the body
+to expand as more data is added, so the request gets added to a wait list.
+
+The transfer() runs until the requests channel is closed by AsyncStream.Close()
+
+To track readers, streamer uses the readersMonitor() goroutine. This goroutine
+chooses which channels to receive from based on the number of outstanding
+readers. When a new reader is created, it sends a message on the add_reader
+channel. If the number of readers is already at MAX_READERS, this blocks the
+sender until an existing reader is closed. When a reader is closed, it sends a
+message on the subtract_reader channel. Finally, when AsyncStream.Close() is
+called, it sends a message on the wait_zero_readers channel, which will block
+the sender unless there are zero readers and it is safe to shut down the
+AsyncStream.
+*/
+
package streamer
import (
const MAX_READERS = 100
// A slice passed from readIntoBuffer() to transfer()
-type readerSlice struct {
+type nextSlice struct {
slice []byte
reader_error error
}
// A read request to the Transfer() function
-type readRequest struct {
+type sliceRequest struct {
offset int
maxsize int
- result chan<- readResult
+ result chan<- sliceResult
}
// A read result from the Transfer() function
-type readResult struct {
+type sliceResult struct {
slice []byte
err error
}
// Read repeatedly from the reader and write sequentially into the specified
// buffer, and report each read to channel 'c'. Completes when Reader 'r'
// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
+func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
defer close(slices)
if writeto, ok := r.(io.WriterTo); ok {
n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
if err != nil {
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
} else {
- slices <- readerSlice{buffer[:n], nil}
- slices <- readerSlice{nil, io.EOF}
+ slices <- nextSlice{buffer[:n], nil}
+ slices <- nextSlice{nil, io.EOF}
}
return
} else {
if n > 0 {
// Reader has more data but we have nowhere to
// put it, so we're stuffed
- slices <- readerSlice{nil, io.ErrShortBuffer}
+ slices <- nextSlice{nil, io.ErrShortBuffer}
} else {
// Return some other error (hopefully EOF)
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
}
return
}
// End on error (includes EOF)
if err != nil {
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
return
}
if n > 0 {
// Make a slice with the contents of the read
- slices <- readerSlice{ptr[:n], nil}
+ slices <- nextSlice{ptr[:n], nil}
// Adjust the scratch space slice
ptr = ptr[n:]
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
-func handleReadRequest(req readRequest, body []byte, complete bool) bool {
- if req.offset < len(body) {
+func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
+ if (reader_status != nil) && (reader_status != io.EOF) {
+ req.result <- sliceResult{nil, reader_status}
+ return true
+ } else if req.offset < len(body) {
var end int
if req.offset+req.maxsize < len(body) {
end = req.offset + req.maxsize
} else {
end = len(body)
}
- req.result <- readResult{body[req.offset:end], nil}
+ req.result <- sliceResult{body[req.offset:end], nil}
return true
- } else if complete && req.offset >= len(body) {
- req.result <- readResult{nil, io.EOF}
+ } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
+ req.result <- sliceResult{nil, io.EOF}
return true
} else {
return false
// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
// Accepts read requests on the buffer on the 'requests' channel. Completes
// when 'requests' channel is closed.
-func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
+func (this *AsyncStream) transfer(source_reader io.Reader) {
+ source_buffer := this.buffer
+ requests := this.requests
+
// currently buffered data
var body []byte
// for receiving slices from readIntoBuffer
- var slices chan readerSlice = nil
+ var slices chan nextSlice = nil
- // indicates whether the buffered data is complete
- var complete bool = false
+ // indicates the status of the underlying reader
+ var reader_status error = nil
if source_reader != nil {
// 'body' is the buffer slice representing the body content read so far
// used to communicate slices of the buffer as they are
// readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan readerSlice)
+ slices = make(chan nextSlice)
// Spin it off
go readIntoBuffer(source_buffer, source_reader, slices)
body = source_buffer[:]
// buffer is complete
- complete = true
+ reader_status = io.EOF
}
- pending_requests := make([]readRequest, 0)
+ pending_requests := make([]sliceRequest, 0)
for {
select {
case req, valid := <-requests:
// Handle a buffer read request
if valid {
- if !handleReadRequest(req, body, complete) {
+ if !handleReadRequest(req, body, reader_status) {
pending_requests = append(pending_requests, req)
}
} else {
case bk, valid := <-slices:
// Got a new slice from the reader
if valid {
- if bk.reader_error != nil {
- reader_error <- bk.reader_error
- if bk.reader_error == io.EOF {
- // EOF indicates the reader is done
- // sending, so our buffer is complete.
- complete = true
- } else {
- // some other reader error
- return
- }
- }
+ reader_status = bk.reader_error
if bk.slice != nil {
// adjust body bounds now that another slice has been read
// handle pending reads
n := 0
for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, complete) {
-
- // move the element from the
- // back of the slice to
- // position 'n', then shorten
- // the slice by one element
+ if handleReadRequest(pending_requests[n], body, reader_status) {
+ // move the element from the back of the slice to
+ // position 'n', then shorten the slice by one element
pending_requests[n] = pending_requests[len(pending_requests)-1]
pending_requests = pending_requests[0 : len(pending_requests)-1]
} else {
}
}
} else {
- if complete {
- // no more reads
- slices = nil
+ if reader_status == io.EOF {
+ // no more reads expected, so this is ok
} else {
- // reader channel closed without signaling EOF
- reader_error <- io.ErrUnexpectedEOF
- return
+ // slices channel closed without signaling EOF
+ reader_status = io.ErrUnexpectedEOF
}
+ slices = nil
}
}
}