8460: Add Log type.
[arvados.git] / sdk / go / streamer / transfer.go
1 /* Internal implementation of AsyncStream.
2 Outline of operation:
3
4 The kernel is the transfer() goroutine.  It manages concurrent reads and
5 appends to the "body" slice.  "body" is a slice of "source_buffer" that
6 represents the segment of the buffer that is already filled in and available
7 for reading.
8
9 To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
10 from the io.Reader source directly into source_buffer.  Each read goes into a
11 slice of buffer which spans the section immediately following the end of the
12 current "body".  Each time a Read completes, a slice representing the the
13 section just filled in (or any read errors/EOF) is sent over the "slices"
14 channel back to the transfer() function.
15
16 Meanwhile, the transfer() function selects() on two channels, the "requests"
17 channel and the "slices" channel.
18
19 When a message is received on the "slices" channel, this means the a new
20 section of the buffer has data, or an error is signaled.  Since the data has
21 been read directly into the source_buffer, it is able to simply increases the
22 size of the body slice to encompass the newly filled in section.  Then any
23 pending reads are serviced with handleReadRequest (described below).
24
25 When a message is received on the "requests" channel, it means a StreamReader
26 wants access to a slice of the buffer.  This is passed to handleReadRequest().
27
28 The handleReadRequest() function takes a sliceRequest consisting of a buffer
29 offset, maximum size, and channel to send the response.  If there was an error
30 reported from the source reader, it is returned.  If the offset is less than
31 the size of the body, the request can proceed, and it sends a body slice
32 spanning the segment from offset to min(offset+maxsize, end of the body).  If
33 source reader status is EOF (done filling the buffer) and the read request
34 offset is beyond end of the body, it responds with EOF.  Otherwise, the read
35 request is for a slice beyond the current size of "body" but we expect the body
36 to expand as more data is added, so the request gets added to a wait list.
37
38 The transfer() runs until the requests channel is closed by AsyncStream.Close()
39
40 To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
41 chooses which channels to receive from based on the number of outstanding
42 readers.  When a new reader is created, it sends a message on the add_reader
43 channel.  If the number of readers is already at MAX_READERS, this blocks the
44 sender until an existing reader is closed.  When a reader is closed, it sends a
45 message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
46 called, it sends a message on the wait_zero_readers channel, which will block
47 the sender unless there are zero readers and it is safe to shut down the
48 AsyncStream.
49 */
50
51 package streamer
52
53 import (
54         "io"
55 )
56
57 const MAX_READERS = 100
58
59 // A slice passed from readIntoBuffer() to transfer()
60 type nextSlice struct {
61         slice        []byte
62         reader_error error
63 }
64
65 // A read request to the Transfer() function
66 type sliceRequest struct {
67         offset  int
68         maxsize int
69         result  chan<- sliceResult
70 }
71
72 // A read result from the Transfer() function
73 type sliceResult struct {
74         slice []byte
75         err   error
76 }
77
78 // Supports writing into a buffer
79 type bufferWriter struct {
80         buf []byte
81         ptr int
82 }
83
84 // Copy p into this.buf, increment pointer and return number of bytes read.
85 func (this *bufferWriter) Write(p []byte) (n int, err error) {
86         n = copy(this.buf[this.ptr:], p)
87         this.ptr += n
88         return n, nil
89 }
90
91 // Read repeatedly from the reader and write sequentially into the specified
92 // buffer, and report each read to channel 'c'.  Completes when Reader 'r'
93 // reports on the error channel and closes channel 'c'.
94 func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
95         defer close(slices)
96
97         if writeto, ok := r.(io.WriterTo); ok {
98                 n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
99                 if err != nil {
100                         slices <- nextSlice{nil, err}
101                 } else {
102                         slices <- nextSlice{buffer[:n], nil}
103                         slices <- nextSlice{nil, io.EOF}
104                 }
105                 return
106         } else {
107                 // Initially entire buffer is available
108                 ptr := buffer[:]
109                 for {
110                         var n int
111                         var err error
112                         if len(ptr) > 0 {
113                                 const readblock = 64 * 1024
114                                 // Read 64KiB into the next part of the buffer
115                                 if len(ptr) > readblock {
116                                         n, err = r.Read(ptr[:readblock])
117                                 } else {
118                                         n, err = r.Read(ptr)
119                                 }
120                         } else {
121                                 // Ran out of buffer space, try reading one more byte
122                                 var b [1]byte
123                                 n, err = r.Read(b[:])
124
125                                 if n > 0 {
126                                         // Reader has more data but we have nowhere to
127                                         // put it, so we're stuffed
128                                         slices <- nextSlice{nil, io.ErrShortBuffer}
129                                 } else {
130                                         // Return some other error (hopefully EOF)
131                                         slices <- nextSlice{nil, err}
132                                 }
133                                 return
134                         }
135
136                         // End on error (includes EOF)
137                         if err != nil {
138                                 slices <- nextSlice{nil, err}
139                                 return
140                         }
141
142                         if n > 0 {
143                                 // Make a slice with the contents of the read
144                                 slices <- nextSlice{ptr[:n], nil}
145
146                                 // Adjust the scratch space slice
147                                 ptr = ptr[n:]
148                         }
149                 }
150         }
151 }
152
153 // Handle a read request.  Returns true if a response was sent, and false if
154 // the request should be queued.
155 func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
156         if (reader_status != nil) && (reader_status != io.EOF) {
157                 req.result <- sliceResult{nil, reader_status}
158                 return true
159         } else if req.offset < len(body) {
160                 var end int
161                 if req.offset+req.maxsize < len(body) {
162                         end = req.offset + req.maxsize
163                 } else {
164                         end = len(body)
165                 }
166                 req.result <- sliceResult{body[req.offset:end], nil}
167                 return true
168         } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
169                 req.result <- sliceResult{nil, io.EOF}
170                 return true
171         } else {
172                 return false
173         }
174 }
175
176 // Mediates between reads and appends.
177 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
178 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
179 // Accepts read requests on the buffer on the 'requests' channel.  Completes
180 // when 'requests' channel is closed.
181 func (this *AsyncStream) transfer(source_reader io.Reader) {
182         source_buffer := this.buffer
183         requests := this.requests
184
185         // currently buffered data
186         var body []byte
187
188         // for receiving slices from readIntoBuffer
189         var slices chan nextSlice = nil
190
191         // indicates the status of the underlying reader
192         var reader_status error = nil
193
194         if source_reader != nil {
195                 // 'body' is the buffer slice representing the body content read so far
196                 body = source_buffer[:0]
197
198                 // used to communicate slices of the buffer as they are
199                 // readIntoBuffer will close 'slices' when it is done with it
200                 slices = make(chan nextSlice)
201
202                 // Spin it off
203                 go readIntoBuffer(source_buffer, source_reader, slices)
204         } else {
205                 // use the whole buffer
206                 body = source_buffer[:]
207
208                 // buffer is complete
209                 reader_status = io.EOF
210         }
211
212         pending_requests := make([]sliceRequest, 0)
213
214         for {
215                 select {
216                 case req, valid := <-requests:
217                         // Handle a buffer read request
218                         if valid {
219                                 if !handleReadRequest(req, body, reader_status) {
220                                         pending_requests = append(pending_requests, req)
221                                 }
222                         } else {
223                                 // closed 'requests' channel indicates we're done
224                                 return
225                         }
226
227                 case bk, valid := <-slices:
228                         // Got a new slice from the reader
229                         if valid {
230                                 reader_status = bk.reader_error
231
232                                 if bk.slice != nil {
233                                         // adjust body bounds now that another slice has been read
234                                         body = source_buffer[0 : len(body)+len(bk.slice)]
235                                 }
236
237                                 // handle pending reads
238                                 n := 0
239                                 for n < len(pending_requests) {
240                                         if handleReadRequest(pending_requests[n], body, reader_status) {
241                                                 // move the element from the back of the slice to
242                                                 // position 'n', then shorten the slice by one element
243                                                 pending_requests[n] = pending_requests[len(pending_requests)-1]
244                                                 pending_requests = pending_requests[0 : len(pending_requests)-1]
245                                         } else {
246
247                                                 // Request wasn't handled, so keep it in the request slice
248                                                 n += 1
249                                         }
250                                 }
251                         } else {
252                                 if reader_status == nil {
253                                         // slices channel closed without signaling EOF
254                                         reader_status = io.ErrUnexpectedEOF
255                                 }
256                                 slices = nil
257                         }
258                 }
259         }
260 }
261
262 func (this *AsyncStream) readersMonitor() {
263         var readers int = 0
264
265         for {
266                 if readers == 0 {
267                         select {
268                         case _, ok := <-this.wait_zero_readers:
269                                 if ok {
270                                         // nothing, just implicitly unblock the sender
271                                 } else {
272                                         return
273                                 }
274                         case _, ok := <-this.add_reader:
275                                 if ok {
276                                         readers += 1
277                                 } else {
278                                         return
279                                 }
280                         }
281                 } else if readers > 0 && readers < MAX_READERS {
282                         select {
283                         case _, ok := <-this.add_reader:
284                                 if ok {
285                                         readers += 1
286                                 } else {
287                                         return
288                                 }
289
290                         case _, ok := <-this.subtract_reader:
291                                 if ok {
292                                         readers -= 1
293                                 } else {
294                                         return
295                                 }
296                         }
297                 } else if readers == MAX_READERS {
298                         _, ok := <-this.subtract_reader
299                         if ok {
300                                 readers -= 1
301                         } else {
302                                 return
303                         }
304                 }
305         }
306 }