Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / streamer / transfer.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 /* Internal implementation of AsyncStream.
6 Outline of operation:
7
8 The kernel is the transfer() goroutine.  It manages concurrent reads and
9 appends to the "body" slice.  "body" is a slice of "source_buffer" that
10 represents the segment of the buffer that is already filled in and available
11 for reading.
12
13 To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
14 from the io.Reader source directly into source_buffer.  Each read goes into a
15 slice of buffer which spans the section immediately following the end of the
16 current "body".  Each time a Read completes, a slice representing the the
17 section just filled in (or any read errors/EOF) is sent over the "slices"
18 channel back to the transfer() function.
19
20 Meanwhile, the transfer() function selects() on two channels, the "requests"
21 channel and the "slices" channel.
22
23 When a message is received on the "slices" channel, this means the a new
24 section of the buffer has data, or an error is signaled.  Since the data has
25 been read directly into the source_buffer, it is able to simply increases the
26 size of the body slice to encompass the newly filled in section.  Then any
27 pending reads are serviced with handleReadRequest (described below).
28
29 When a message is received on the "requests" channel, it means a StreamReader
30 wants access to a slice of the buffer.  This is passed to handleReadRequest().
31
32 The handleReadRequest() function takes a sliceRequest consisting of a buffer
33 offset, maximum size, and channel to send the response.  If there was an error
34 reported from the source reader, it is returned.  If the offset is less than
35 the size of the body, the request can proceed, and it sends a body slice
36 spanning the segment from offset to min(offset+maxsize, end of the body).  If
37 source reader status is EOF (done filling the buffer) and the read request
38 offset is beyond end of the body, it responds with EOF.  Otherwise, the read
39 request is for a slice beyond the current size of "body" but we expect the body
40 to expand as more data is added, so the request gets added to a wait list.
41
42 The transfer() runs until the requests channel is closed by AsyncStream.Close()
43
44 To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
45 chooses which channels to receive from based on the number of outstanding
46 readers.  When a new reader is created, it sends a message on the add_reader
47 channel.  If the number of readers is already at MAX_READERS, this blocks the
48 sender until an existing reader is closed.  When a reader is closed, it sends a
49 message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
50 called, it sends a message on the wait_zero_readers channel, which will block
51 the sender unless there are zero readers and it is safe to shut down the
52 AsyncStream.
53 */
54
55 package streamer
56
57 import (
58         "io"
59 )
60
61 const MAX_READERS = 100
62
63 // A slice passed from readIntoBuffer() to transfer()
64 type nextSlice struct {
65         slice        []byte
66         reader_error error
67 }
68
69 // A read request to the Transfer() function
70 type sliceRequest struct {
71         offset  int
72         maxsize int
73         result  chan<- sliceResult
74 }
75
76 // A read result from the Transfer() function
77 type sliceResult struct {
78         slice []byte
79         err   error
80 }
81
82 // Supports writing into a buffer
83 type bufferWriter struct {
84         buf []byte
85         ptr int
86 }
87
88 // Copy p into this.buf, increment pointer and return number of bytes read.
89 func (this *bufferWriter) Write(p []byte) (n int, err error) {
90         n = copy(this.buf[this.ptr:], p)
91         this.ptr += n
92         return n, nil
93 }
94
95 // Read repeatedly from the reader and write sequentially into the specified
96 // buffer, and report each read to channel 'c'.  Completes when Reader 'r'
97 // reports on the error channel and closes channel 'c'.
98 func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
99         defer close(slices)
100
101         if writeto, ok := r.(io.WriterTo); ok {
102                 n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
103                 if err != nil {
104                         slices <- nextSlice{nil, err}
105                 } else {
106                         slices <- nextSlice{buffer[:n], nil}
107                         slices <- nextSlice{nil, io.EOF}
108                 }
109                 return
110         } else {
111                 // Initially entire buffer is available
112                 ptr := buffer[:]
113                 for {
114                         var n int
115                         var err error
116                         if len(ptr) > 0 {
117                                 const readblock = 64 * 1024
118                                 // Read 64KiB into the next part of the buffer
119                                 if len(ptr) > readblock {
120                                         n, err = r.Read(ptr[:readblock])
121                                 } else {
122                                         n, err = r.Read(ptr)
123                                 }
124                         } else {
125                                 // Ran out of buffer space, try reading one more byte
126                                 var b [1]byte
127                                 n, err = r.Read(b[:])
128
129                                 if n > 0 {
130                                         // Reader has more data but we have nowhere to
131                                         // put it, so we're stuffed
132                                         slices <- nextSlice{nil, io.ErrShortBuffer}
133                                 } else {
134                                         // Return some other error (hopefully EOF)
135                                         slices <- nextSlice{nil, err}
136                                 }
137                                 return
138                         }
139
140                         // End on error (includes EOF)
141                         if err != nil {
142                                 slices <- nextSlice{nil, err}
143                                 return
144                         }
145
146                         if n > 0 {
147                                 // Make a slice with the contents of the read
148                                 slices <- nextSlice{ptr[:n], nil}
149
150                                 // Adjust the scratch space slice
151                                 ptr = ptr[n:]
152                         }
153                 }
154         }
155 }
156
157 // Handle a read request.  Returns true if a response was sent, and false if
158 // the request should be queued.
159 func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
160         if (reader_status != nil) && (reader_status != io.EOF) {
161                 req.result <- sliceResult{nil, reader_status}
162                 return true
163         } else if req.offset < len(body) {
164                 var end int
165                 if req.offset+req.maxsize < len(body) {
166                         end = req.offset + req.maxsize
167                 } else {
168                         end = len(body)
169                 }
170                 req.result <- sliceResult{body[req.offset:end], nil}
171                 return true
172         } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
173                 req.result <- sliceResult{nil, io.EOF}
174                 return true
175         } else {
176                 return false
177         }
178 }
179
180 // Mediates between reads and appends.
181 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
182 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
183 // Accepts read requests on the buffer on the 'requests' channel.  Completes
184 // when 'requests' channel is closed.
185 func (this *AsyncStream) transfer(source_reader io.Reader) {
186         source_buffer := this.buffer
187         requests := this.requests
188
189         // currently buffered data
190         var body []byte
191
192         // for receiving slices from readIntoBuffer
193         var slices chan nextSlice = nil
194
195         // indicates the status of the underlying reader
196         var reader_status error = nil
197
198         if source_reader != nil {
199                 // 'body' is the buffer slice representing the body content read so far
200                 body = source_buffer[:0]
201
202                 // used to communicate slices of the buffer as they are
203                 // readIntoBuffer will close 'slices' when it is done with it
204                 slices = make(chan nextSlice)
205
206                 // Spin it off
207                 go readIntoBuffer(source_buffer, source_reader, slices)
208         } else {
209                 // use the whole buffer
210                 body = source_buffer[:]
211
212                 // buffer is complete
213                 reader_status = io.EOF
214         }
215
216         pending_requests := make([]sliceRequest, 0)
217
218         for {
219                 select {
220                 case req, valid := <-requests:
221                         // Handle a buffer read request
222                         if valid {
223                                 if !handleReadRequest(req, body, reader_status) {
224                                         pending_requests = append(pending_requests, req)
225                                 }
226                         } else {
227                                 // closed 'requests' channel indicates we're done
228                                 return
229                         }
230
231                 case bk, valid := <-slices:
232                         // Got a new slice from the reader
233                         if valid {
234                                 reader_status = bk.reader_error
235
236                                 if bk.slice != nil {
237                                         // adjust body bounds now that another slice has been read
238                                         body = source_buffer[0 : len(body)+len(bk.slice)]
239                                 }
240
241                                 // handle pending reads
242                                 n := 0
243                                 for n < len(pending_requests) {
244                                         if handleReadRequest(pending_requests[n], body, reader_status) {
245                                                 // move the element from the back of the slice to
246                                                 // position 'n', then shorten the slice by one element
247                                                 pending_requests[n] = pending_requests[len(pending_requests)-1]
248                                                 pending_requests = pending_requests[0 : len(pending_requests)-1]
249                                         } else {
250
251                                                 // Request wasn't handled, so keep it in the request slice
252                                                 n += 1
253                                         }
254                                 }
255                         } else {
256                                 if reader_status == nil {
257                                         // slices channel closed without signaling EOF
258                                         reader_status = io.ErrUnexpectedEOF
259                                 }
260                                 slices = nil
261                         }
262                 }
263         }
264 }
265
266 func (this *AsyncStream) readersMonitor() {
267         var readers int = 0
268
269         for {
270                 if readers == 0 {
271                         select {
272                         case _, ok := <-this.wait_zero_readers:
273                                 if ok {
274                                         // nothing, just implicitly unblock the sender
275                                 } else {
276                                         return
277                                 }
278                         case _, ok := <-this.add_reader:
279                                 if ok {
280                                         readers += 1
281                                 } else {
282                                         return
283                                 }
284                         }
285                 } else if readers > 0 && readers < MAX_READERS {
286                         select {
287                         case _, ok := <-this.add_reader:
288                                 if ok {
289                                         readers += 1
290                                 } else {
291                                         return
292                                 }
293
294                         case _, ok := <-this.subtract_reader:
295                                 if ok {
296                                         readers -= 1
297                                 } else {
298                                         return
299                                 }
300                         }
301                 } else if readers == MAX_READERS {
302                         _, ok := <-this.subtract_reader
303                         if ok {
304                                 readers -= 1
305                         } else {
306                                 return
307                         }
308                 }
309         }
310 }