8 // A slice passed from readIntoBuffer() to transfer()
9 type readerSlice struct {
14 // A read request to the Transfer() function
15 type readRequest struct {
18 result chan<- readResult
21 // A read result from the Transfer() function
22 type readResult struct {
27 // Supports writing into a buffer
28 type bufferWriter struct {
33 // Copy p into this.buf, increment pointer and return number of bytes read.
34 func (this *bufferWriter) Write(p []byte) (n int, err error) {
35 n = copy(this.buf[this.ptr:], p)
40 // Read repeatedly from the reader and write sequentially into the specified
41 // buffer, and report each read to channel 'c'. Completes when Reader 'r'
42 // reports on the error channel and closes channel 'c'.
43 func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
46 if writeto, ok := r.(io.WriterTo); ok {
47 n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
49 slices <- readerSlice{nil, err}
51 slices <- readerSlice{buffer[:n], nil}
52 slices <- readerSlice{nil, io.EOF}
56 // Initially entire buffer is available
62 const readblock = 64 * 1024
63 // Read 64KiB into the next part of the buffer
64 if len(ptr) > readblock {
65 n, err = r.Read(ptr[:readblock])
70 // Ran out of buffer space, try reading one more byte
75 // Reader has more data but we have nowhere to
76 // put it, so we're stuffed
77 slices <- readerSlice{nil, io.ErrShortBuffer}
79 // Return some other error (hopefully EOF)
80 slices <- readerSlice{nil, err}
85 // End on error (includes EOF)
87 slices <- readerSlice{nil, err}
92 // Make a slice with the contents of the read
93 slices <- readerSlice{ptr[:n], nil}
95 // Adjust the scratch space slice
102 // Handle a read request. Returns true if a response was sent, and false if
103 // the request should be queued.
104 func handleReadRequest(req readRequest, body []byte, complete bool) bool {
105 log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body))
106 if req.offset < len(body) {
108 if req.offset+req.maxsize < len(body) {
109 end = req.offset + req.maxsize
113 req.result <- readResult{body[req.offset:end], nil}
115 } else if complete && req.offset >= len(body) {
116 req.result <- readResult{nil, io.EOF}
123 // Mediates between reads and appends.
124 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
125 // in the provided buffer. Otherwise, use the contents of 'buffer' as is.
126 // Accepts read requests on the buffer on the 'requests' channel. Completes
127 // when 'requests' channel is closed.
128 func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
129 // currently buffered data
132 // for receiving slices from readIntoBuffer
133 var slices chan readerSlice = nil
135 // indicates whether the buffered data is complete
136 var complete bool = false
138 if source_reader != nil {
139 // 'body' is the buffer slice representing the body content read so far
140 body = source_buffer[:0]
142 // used to communicate slices of the buffer as they are
143 // readIntoBuffer will close 'slices' when it is done with it
144 slices = make(chan readerSlice)
147 go readIntoBuffer(source_buffer, source_reader, slices)
149 // use the whole buffer
150 body = source_buffer[:]
152 // buffer is complete
156 pending_requests := make([]readRequest, 0)
160 case req, valid := <-requests:
161 // Handle a buffer read request
163 if !handleReadRequest(req, body, complete) {
164 pending_requests = append(pending_requests, req)
167 // closed 'requests' channel indicates we're done
171 case bk, valid := <-slices:
172 // Got a new slice from the reader
174 if bk.reader_error != nil {
175 reader_error <- bk.reader_error
176 if bk.reader_error == io.EOF {
177 // EOF indicates the reader is done
178 // sending, so our buffer is complete.
181 // some other reader error
187 // adjust body bounds now that another slice has been read
188 body = source_buffer[0 : len(body)+len(bk.slice)]
191 // handle pending reads
193 for n < len(pending_requests) {
194 if handleReadRequest(pending_requests[n], body, complete) {
196 // move the element from the
197 // back of the slice to
198 // position 'n', then shorten
199 // the slice by one element
200 pending_requests[n] = pending_requests[len(pending_requests)-1]
201 pending_requests = pending_requests[0 : len(pending_requests)-1]
204 // Request wasn't handled, so keep it in the request slice
213 // reader channel closed without signaling EOF
214 reader_error <- io.ErrUnexpectedEOF