8 type ReaderSlice struct {
13 // Read repeatedly from the reader into the specified buffer, and report each
14 // read to channel 'c'. Completes when Reader 'r' reports on the error channel
15 // and closes channel 'c'.
16 func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
19 // Initially use entire buffer as scratch space
25 // Read into the scratch space
28 // Ran out of scratch space, try reading one more byte
33 // Reader has more data but we have nowhere to
34 // put it, so we're stuffed
35 slices <- ReaderSlice{nil, io.ErrShortBuffer}
37 // Return some other error (hopefully EOF)
38 slices <- ReaderSlice{nil, err}
43 // End on error (includes EOF)
45 slices <- ReaderSlice{nil, err}
50 // Make a slice with the contents of the read
51 slices <- ReaderSlice{ptr[:n], nil}
53 // Adjust the scratch space slice
59 // A read request to the Transfer() function
60 type ReadRequest struct {
63 result chan<- ReadResult
66 // A read result from the Transfer() function
67 type ReadResult struct {
72 // Reads from the buffer managed by the Transfer()
73 type BufferReader struct {
75 requests chan<- ReadRequest
76 responses chan ReadResult
79 func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
80 return BufferReader{new(int), requests, make(chan ReadResult)}
83 // Reads from the buffer managed by the Transfer()
84 func (this BufferReader) Read(p []byte) (n int, err error) {
85 this.requests <- ReadRequest{*this.offset, len(p), this.responses}
86 rr, valid := <-this.responses
88 *this.offset += len(rr.slice)
89 return copy(p, rr.slice), rr.err
91 return 0, io.ErrUnexpectedEOF
95 func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
96 // Record starting offset in order to correctly report the number of bytes sent
97 starting_offset := *this.offset
99 this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
100 rr, valid := <-this.responses
102 log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
103 *this.offset += len(rr.slice)
105 if rr.err == io.EOF {
106 // EOF is not an error.
107 return int64(*this.offset - starting_offset), nil
109 return int64(*this.offset - starting_offset), rr.err
115 return int64(*this.offset), io.ErrUnexpectedEOF
120 // Close the responses channel
121 func (this BufferReader) Close() error {
122 close(this.responses)
126 // Handle a read request. Returns true if a response was sent, and false if
127 // the request should be queued.
128 func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
129 log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
130 if req.offset < len(body) {
132 if req.offset+req.maxsize < len(body) {
133 end = req.offset + req.maxsize
137 req.result <- ReadResult{body[req.offset:end], nil}
139 } else if complete && req.offset >= len(body) {
140 req.result <- ReadResult{nil, io.EOF}
147 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
148 // in the provided buffer. Otherwise, use the contents of 'buffer' as is.
149 // Accepts read requests on the buffer on the 'requests' channel. Completes
150 // when 'requests' channel is closed.
151 func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
152 // currently buffered data
155 // for receiving slices from ReadIntoBuffer
156 var slices chan ReaderSlice = nil
158 // indicates whether the buffered data is complete
159 var complete bool = false
161 if source_reader != nil {
162 // 'body' is the buffer slice representing the body content read so far
163 body = source_buffer[:0]
165 // used to communicate slices of the buffer as they are
166 // ReadIntoBuffer will close 'slices' when it is done with it
167 slices = make(chan ReaderSlice)
170 go ReadIntoBuffer(source_buffer, source_reader, slices)
172 // use the whole buffer
173 body = source_buffer[:]
175 // buffer is complete
179 pending_requests := make([]ReadRequest, 0)
183 case req, valid := <-requests:
184 // Handle a buffer read request
186 if !HandleReadRequest(req, body, complete) {
187 pending_requests = append(pending_requests, req)
190 // closed 'requests' channel indicates we're done
194 case bk, valid := <-slices:
195 // Got a new slice from the reader
197 if bk.reader_error != nil {
198 reader_error <- bk.reader_error
199 if bk.reader_error == io.EOF {
200 // EOF indicates the reader is done
201 // sending, so our buffer is complete.
204 // some other reader error
210 // adjust body bounds now that another slice has been read
211 body = source_buffer[0 : len(body)+len(bk.slice)]
214 // handle pending reads
216 for n < len(pending_requests) {
217 if HandleReadRequest(pending_requests[n], body, complete) {
219 // move the element from the
220 // back of the slice to
221 // position 'n', then shorten
222 // the slice by one element
223 pending_requests[n] = pending_requests[len(pending_requests)-1]
224 pending_requests = pending_requests[0 : len(pending_requests)-1]
227 // Request wasn't handled, so keep it in the request slice
236 // reader channel closed without signaling EOF
237 reader_error <- io.ErrUnexpectedEOF