/* Internal implementation of AsyncStream. Outline of operation: The kernel is the transfer() goroutine. It manages concurrent reads and appends to the "body" slice. "body" is a slice of "source_buffer" that represents the segment of the buffer that is already filled in and available for reading. To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read from the io.Reader source directly into source_buffer. Each read goes into a slice of buffer which spans the section immediately following the end of the current "body". Each time a Read completes, a slice representing the the section just filled in (or any read errors/EOF) is sent over the "slices" channel back to the transfer() function. Meanwhile, the transfer() function selects() on two channels, the "requests" channel and the "slices" channel. When a message is recieved on the "slices" channel, this means the a new section of the buffer has data, or an error is signaled. Since the data has been read directly into the source_buffer, it is able to simply increases the size of the body slice to encompass the newly filled in section. Then any pending reads are serviced with handleReadRequest (described below). When a message is recieved on the "requests" channel, it means a StreamReader wants access to a slice of the buffer. This is passed to handleReadRequest(). The handleReadRequest() function takes a sliceRequest consisting of a buffer offset, maximum size, and channel to send the response. If there was an error reported from the source reader, it is returned. If the offset is less than the size of the body, the request can proceed, and it sends a body slice spanning the segment from offset to min(offset+maxsize, end of the body). If source reader status is EOF (done filling the buffer) and the read request offset is beyond end of the body, it responds with EOF. Otherwise, the read request is for a slice beyond the current size of "body" but we expect the body to expand as more data is added, so the request gets added to a wait list. The transfer() runs until the requests channel is closed by AsyncStream.Close() To track readers, streamer uses the readersMonitor() goroutine. This goroutine chooses which channels to receive from based on the number of outstanding readers. When a new reader is created, it sends a message on the add_reader channel. If the number of readers is already at MAX_READERS, this blocks the sender until an existing reader is closed. When a reader is closed, it sends a message on the subtract_reader channel. Finally, when AsyncStream.Close() is called, it sends a message on the wait_zero_readers channel, which will block the sender unless there are zero readers and it is safe to shut down the AsyncStream. */ package streamer import ( "io" ) const MAX_READERS = 100 // A slice passed from readIntoBuffer() to transfer() type nextSlice struct { slice []byte reader_error error } // A read request to the Transfer() function type sliceRequest struct { offset int maxsize int result chan<- sliceResult } // A read result from the Transfer() function type sliceResult struct { slice []byte err error } // Supports writing into a buffer type bufferWriter struct { buf []byte ptr int } // Copy p into this.buf, increment pointer and return number of bytes read. func (this *bufferWriter) Write(p []byte) (n int, err error) { n = copy(this.buf[this.ptr:], p) this.ptr += n return n, nil } // Read repeatedly from the reader and write sequentially into the specified // buffer, and report each read to channel 'c'. Completes when Reader 'r' // reports on the error channel and closes channel 'c'. func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) { defer close(slices) if writeto, ok := r.(io.WriterTo); ok { n, err := writeto.WriteTo(&bufferWriter{buffer, 0}) if err != nil { slices <- nextSlice{nil, err} } else { slices <- nextSlice{buffer[:n], nil} slices <- nextSlice{nil, io.EOF} } return } else { // Initially entire buffer is available ptr := buffer[:] for { var n int var err error if len(ptr) > 0 { const readblock = 64 * 1024 // Read 64KiB into the next part of the buffer if len(ptr) > readblock { n, err = r.Read(ptr[:readblock]) } else { n, err = r.Read(ptr) } } else { // Ran out of buffer space, try reading one more byte var b [1]byte n, err = r.Read(b[:]) if n > 0 { // Reader has more data but we have nowhere to // put it, so we're stuffed slices <- nextSlice{nil, io.ErrShortBuffer} } else { // Return some other error (hopefully EOF) slices <- nextSlice{nil, err} } return } // End on error (includes EOF) if err != nil { slices <- nextSlice{nil, err} return } if n > 0 { // Make a slice with the contents of the read slices <- nextSlice{ptr[:n], nil} // Adjust the scratch space slice ptr = ptr[n:] } } } } // Handle a read request. Returns true if a response was sent, and false if // the request should be queued. func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool { if (reader_status != nil) && (reader_status != io.EOF) { req.result <- sliceResult{nil, reader_status} return true } else if req.offset < len(body) { var end int if req.offset+req.maxsize < len(body) { end = req.offset + req.maxsize } else { end = len(body) } req.result <- sliceResult{body[req.offset:end], nil} return true } else if (reader_status == io.EOF) && (req.offset >= len(body)) { req.result <- sliceResult{nil, io.EOF} return true } else { return false } } // Mediates between reads and appends. // If 'source_reader' is not nil, reads data from 'source_reader' and stores it // in the provided buffer. Otherwise, use the contents of 'buffer' as is. // Accepts read requests on the buffer on the 'requests' channel. Completes // when 'requests' channel is closed. func (this *AsyncStream) transfer(source_reader io.Reader) { source_buffer := this.buffer requests := this.requests // currently buffered data var body []byte // for receiving slices from readIntoBuffer var slices chan nextSlice = nil // indicates the status of the underlying reader var reader_status error = nil if source_reader != nil { // 'body' is the buffer slice representing the body content read so far body = source_buffer[:0] // used to communicate slices of the buffer as they are // readIntoBuffer will close 'slices' when it is done with it slices = make(chan nextSlice) // Spin it off go readIntoBuffer(source_buffer, source_reader, slices) } else { // use the whole buffer body = source_buffer[:] // buffer is complete reader_status = io.EOF } pending_requests := make([]sliceRequest, 0) for { select { case req, valid := <-requests: // Handle a buffer read request if valid { if !handleReadRequest(req, body, reader_status) { pending_requests = append(pending_requests, req) } } else { // closed 'requests' channel indicates we're done return } case bk, valid := <-slices: // Got a new slice from the reader if valid { reader_status = bk.reader_error if bk.slice != nil { // adjust body bounds now that another slice has been read body = source_buffer[0 : len(body)+len(bk.slice)] } // handle pending reads n := 0 for n < len(pending_requests) { if handleReadRequest(pending_requests[n], body, reader_status) { // move the element from the back of the slice to // position 'n', then shorten the slice by one element pending_requests[n] = pending_requests[len(pending_requests)-1] pending_requests = pending_requests[0 : len(pending_requests)-1] } else { // Request wasn't handled, so keep it in the request slice n += 1 } } } else { if reader_status == io.EOF { // no more reads expected, so this is ok } else { // slices channel closed without signaling EOF reader_status = io.ErrUnexpectedEOF } slices = nil } } } } func (this *AsyncStream) readersMonitor() { var readers int = 0 for { if readers == 0 { select { case _, ok := <-this.wait_zero_readers: if ok { // nothing, just implicitly unblock the sender } else { return } case _, ok := <-this.add_reader: if ok { readers += 1 } else { return } } } else if readers > 0 && readers < MAX_READERS { select { case _, ok := <-this.add_reader: if ok { readers += 1 } else { return } case _, ok := <-this.subtract_reader: if ok { readers -= 1 } else { return } } } else if readers == MAX_READERS { _, ok := <-this.subtract_reader if ok { readers -= 1 } else { return } } } }