1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 /* Internal implementation of AsyncStream.
8 The kernel is the transfer() goroutine. It manages concurrent reads and
9 appends to the "body" slice. "body" is a slice of "source_buffer" that
10 represents the segment of the buffer that is already filled in and available
13 To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
14 from the io.Reader source directly into source_buffer. Each read goes into a
15 slice of buffer which spans the section immediately following the end of the
16 current "body". Each time a Read completes, a slice representing the the
17 section just filled in (or any read errors/EOF) is sent over the "slices"
18 channel back to the transfer() function.
20 Meanwhile, the transfer() function selects() on two channels, the "requests"
21 channel and the "slices" channel.
23 When a message is received on the "slices" channel, this means the a new
24 section of the buffer has data, or an error is signaled. Since the data has
25 been read directly into the source_buffer, it is able to simply increases the
26 size of the body slice to encompass the newly filled in section. Then any
27 pending reads are serviced with handleReadRequest (described below).
29 When a message is received on the "requests" channel, it means a StreamReader
30 wants access to a slice of the buffer. This is passed to handleReadRequest().
32 The handleReadRequest() function takes a sliceRequest consisting of a buffer
33 offset, maximum size, and channel to send the response. If there was an error
34 reported from the source reader, it is returned. If the offset is less than
35 the size of the body, the request can proceed, and it sends a body slice
36 spanning the segment from offset to min(offset+maxsize, end of the body). If
37 source reader status is EOF (done filling the buffer) and the read request
38 offset is beyond end of the body, it responds with EOF. Otherwise, the read
39 request is for a slice beyond the current size of "body" but we expect the body
40 to expand as more data is added, so the request gets added to a wait list.
42 The transfer() runs until the requests channel is closed by AsyncStream.Close()
44 To track readers, streamer uses the readersMonitor() goroutine. This goroutine
45 chooses which channels to receive from based on the number of outstanding
46 readers. When a new reader is created, it sends a message on the add_reader
47 channel. If the number of readers is already at MAX_READERS, this blocks the
48 sender until an existing reader is closed. When a reader is closed, it sends a
49 message on the subtract_reader channel. Finally, when AsyncStream.Close() is
50 called, it sends a message on the wait_zero_readers channel, which will block
51 the sender unless there are zero readers and it is safe to shut down the
61 const MAX_READERS = 100
63 // A slice passed from readIntoBuffer() to transfer()
64 type nextSlice struct {
69 // A read request to the Transfer() function
70 type sliceRequest struct {
73 result chan<- sliceResult
76 // A read result from the Transfer() function
77 type sliceResult struct {
82 // Supports writing into a buffer
83 type bufferWriter struct {
88 // Copy p into this.buf, increment pointer and return number of bytes read.
89 func (this *bufferWriter) Write(p []byte) (n int, err error) {
90 n = copy(this.buf[this.ptr:], p)
95 // Read repeatedly from the reader and write sequentially into the specified
96 // buffer, and report each read to channel 'c'. Completes when Reader 'r'
97 // reports on the error channel and closes channel 'c'.
98 func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
101 if writeto, ok := r.(io.WriterTo); ok {
102 n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
104 slices <- nextSlice{nil, err}
106 slices <- nextSlice{buffer[:n], nil}
107 slices <- nextSlice{nil, io.EOF}
111 // Initially entire buffer is available
117 const readblock = 64 * 1024
118 // Read 64KiB into the next part of the buffer
119 if len(ptr) > readblock {
120 n, err = r.Read(ptr[:readblock])
125 // Ran out of buffer space, try reading one more byte
127 n, err = r.Read(b[:])
130 // Reader has more data but we have nowhere to
131 // put it, so we're stuffed
132 slices <- nextSlice{nil, io.ErrShortBuffer}
134 // Return some other error (hopefully EOF)
135 slices <- nextSlice{nil, err}
140 // End on error (includes EOF)
142 slices <- nextSlice{nil, err}
147 // Make a slice with the contents of the read
148 slices <- nextSlice{ptr[:n], nil}
150 // Adjust the scratch space slice
157 // Handle a read request. Returns true if a response was sent, and false if
158 // the request should be queued.
159 func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
160 if (reader_status != nil) && (reader_status != io.EOF) {
161 req.result <- sliceResult{nil, reader_status}
163 } else if req.offset < len(body) {
165 if req.offset+req.maxsize < len(body) {
166 end = req.offset + req.maxsize
170 req.result <- sliceResult{body[req.offset:end], nil}
172 } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
173 req.result <- sliceResult{nil, io.EOF}
180 // Mediates between reads and appends.
181 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
182 // in the provided buffer. Otherwise, use the contents of 'buffer' as is.
183 // Accepts read requests on the buffer on the 'requests' channel. Completes
184 // when 'requests' channel is closed.
185 func (this *AsyncStream) transfer(source_reader io.Reader) {
186 source_buffer := this.buffer
187 requests := this.requests
189 // currently buffered data
192 // for receiving slices from readIntoBuffer
193 var slices chan nextSlice = nil
195 // indicates the status of the underlying reader
196 var reader_status error = nil
198 if source_reader != nil {
199 // 'body' is the buffer slice representing the body content read so far
200 body = source_buffer[:0]
202 // used to communicate slices of the buffer as they are
203 // readIntoBuffer will close 'slices' when it is done with it
204 slices = make(chan nextSlice)
207 go readIntoBuffer(source_buffer, source_reader, slices)
209 // use the whole buffer
210 body = source_buffer[:]
212 // buffer is complete
213 reader_status = io.EOF
216 pending_requests := make([]sliceRequest, 0)
220 case req, valid := <-requests:
221 // Handle a buffer read request
223 if !handleReadRequest(req, body, reader_status) {
224 pending_requests = append(pending_requests, req)
227 // closed 'requests' channel indicates we're done
231 case bk, valid := <-slices:
232 // Got a new slice from the reader
234 reader_status = bk.reader_error
237 // adjust body bounds now that another slice has been read
238 body = source_buffer[0 : len(body)+len(bk.slice)]
241 // handle pending reads
243 for n < len(pending_requests) {
244 if handleReadRequest(pending_requests[n], body, reader_status) {
245 // move the element from the back of the slice to
246 // position 'n', then shorten the slice by one element
247 pending_requests[n] = pending_requests[len(pending_requests)-1]
248 pending_requests = pending_requests[0 : len(pending_requests)-1]
251 // Request wasn't handled, so keep it in the request slice
256 if reader_status == nil {
257 // slices channel closed without signaling EOF
258 reader_status = io.ErrUnexpectedEOF
266 func (this *AsyncStream) readersMonitor() {
272 case _, ok := <-this.wait_zero_readers:
274 // nothing, just implicitly unblock the sender
278 case _, ok := <-this.add_reader:
285 } else if readers > 0 && readers < MAX_READERS {
287 case _, ok := <-this.add_reader:
294 case _, ok := <-this.subtract_reader:
301 } else if readers == MAX_READERS {
302 _, ok := <-this.subtract_reader