2798: Renamed internal messaging structs in an attempt to use the word "reader"
[arvados.git] / sdk / go / src / arvados.org / streamer / transfer.go
index 77242f13a6a7553de402c20741730c0674153686..a4a194f69bcc8fbdbf43650caa881325bde5dc24 100644 (file)
@@ -1,3 +1,53 @@
+/* Internal implementation of AsyncStream.
+Outline of operation:
+
+The kernel is the transfer() goroutine.  It manages concurrent reads and
+appends to the "body" slice.  "body" is a slice of "source_buffer" that
+represents the segment of the buffer that is already filled in and available
+for reading.
+
+To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
+from the io.Reader source directly into source_buffer.  Each read goes into a
+slice of buffer which spans the section immediately following the end of the
+current "body".  Each time a Read completes, a slice representing the the
+section just filled in (or any read errors/EOF) is sent over the "slices"
+channel back to the transfer() function.
+
+Meanwhile, the transfer() function selects() on two channels, the "requests"
+channel and the "slices" channel.
+
+When a message is recieved on the "slices" channel, this means the a new
+section of the buffer has data, or an error is signaled.  Since the data has
+been read directly into the source_buffer, it is able to simply increases the
+size of the body slice to encompass the newly filled in section.  Then any
+pending reads are serviced with handleReadRequest (described below).
+
+When a message is recieved on the "requests" channel, it means a StreamReader
+wants access to a slice of the buffer.  This is passed to handleReadRequest().
+
+The handleReadRequest() function takes a sliceRequest consisting of a buffer
+offset, maximum size, and channel to send the response.  If there was an error
+reported from the source reader, it is returned.  If the offset is less than
+the size of the body, the request can proceed, and it sends a body slice
+spanning the segment from offset to min(offset+maxsize, end of the body).  If
+source reader status is EOF (done filling the buffer) and the read request
+offset is beyond end of the body, it responds with EOF.  Otherwise, the read
+request is for a slice beyond the current size of "body" but we expect the body
+to expand as more data is added, so the request gets added to a wait list.
+
+The transfer() runs until the requests channel is closed by AsyncStream.Close()
+
+To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
+chooses which channels to receive from based on the number of outstanding
+readers.  When a new reader is created, it sends a message on the add_reader
+channel.  If the number of readers is already at MAX_READERS, this blocks the
+sender until an existing reader is closed.  When a reader is closed, it sends a
+message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
+called, it sends a message on the wait_zero_readers channel, which will block
+the sender unless there are zero readers and it is safe to shut down the
+AsyncStream.
+*/
+
 package streamer
 
 import (
@@ -7,20 +57,20 @@ import (
 const MAX_READERS = 100
 
 // A slice passed from readIntoBuffer() to transfer()
-type readerSlice struct {
+type nextSlice struct {
        slice        []byte
        reader_error error
 }
 
 // A read request to the Transfer() function
-type readRequest struct {
+type sliceRequest struct {
        offset  int
        maxsize int
-       result  chan<- readResult
+       result  chan<- sliceResult
 }
 
 // A read result from the Transfer() function
-type readResult struct {
+type sliceResult struct {
        slice []byte
        err   error
 }
@@ -41,16 +91,16 @@ func (this *bufferWriter) Write(p []byte) (n int, err error) {
 // Read repeatedly from the reader and write sequentially into the specified
 // buffer, and report each read to channel 'c'.  Completes when Reader 'r'
 // reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
+func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
        defer close(slices)
 
        if writeto, ok := r.(io.WriterTo); ok {
                n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
                if err != nil {
-                       slices <- readerSlice{nil, err}
+                       slices <- nextSlice{nil, err}
                } else {
-                       slices <- readerSlice{buffer[:n], nil}
-                       slices <- readerSlice{nil, io.EOF}
+                       slices <- nextSlice{buffer[:n], nil}
+                       slices <- nextSlice{nil, io.EOF}
                }
                return
        } else {
@@ -75,23 +125,23 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
                                if n > 0 {
                                        // Reader has more data but we have nowhere to
                                        // put it, so we're stuffed
-                                       slices <- readerSlice{nil, io.ErrShortBuffer}
+                                       slices <- nextSlice{nil, io.ErrShortBuffer}
                                } else {
                                        // Return some other error (hopefully EOF)
-                                       slices <- readerSlice{nil, err}
+                                       slices <- nextSlice{nil, err}
                                }
                                return
                        }
 
                        // End on error (includes EOF)
                        if err != nil {
-                               slices <- readerSlice{nil, err}
+                               slices <- nextSlice{nil, err}
                                return
                        }
 
                        if n > 0 {
                                // Make a slice with the contents of the read
-                               slices <- readerSlice{ptr[:n], nil}
+                               slices <- nextSlice{ptr[:n], nil}
 
                                // Adjust the scratch space slice
                                ptr = ptr[n:]
@@ -102,18 +152,21 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
 
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
-func handleReadRequest(req readRequest, body []byte, complete bool) bool {
-       if req.offset < len(body) {
+func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
+       if (reader_status != nil) && (reader_status != io.EOF) {
+               req.result <- sliceResult{nil, reader_status}
+               return true
+       } else if req.offset < len(body) {
                var end int
                if req.offset+req.maxsize < len(body) {
                        end = req.offset + req.maxsize
                } else {
                        end = len(body)
                }
-               req.result <- readResult{body[req.offset:end], nil}
+               req.result <- sliceResult{body[req.offset:end], nil}
                return true
-       } else if complete && req.offset >= len(body) {
-               req.result <- readResult{nil, io.EOF}
+       } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
+               req.result <- sliceResult{nil, io.EOF}
                return true
        } else {
                return false
@@ -125,15 +178,18 @@ func handleReadRequest(req readRequest, body []byte, complete bool) bool {
 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
 // Accepts read requests on the buffer on the 'requests' channel.  Completes
 // when 'requests' channel is closed.
-func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
+func (this *AsyncStream) transfer(source_reader io.Reader) {
+       source_buffer := this.buffer
+       requests := this.requests
+
        // currently buffered data
        var body []byte
 
        // for receiving slices from readIntoBuffer
-       var slices chan readerSlice = nil
+       var slices chan nextSlice = nil
 
-       // indicates whether the buffered data is complete
-       var complete bool = false
+       // indicates the status of the underlying reader
+       var reader_status error = nil
 
        if source_reader != nil {
                // 'body' is the buffer slice representing the body content read so far
@@ -141,7 +197,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 
                // used to communicate slices of the buffer as they are
                // readIntoBuffer will close 'slices' when it is done with it
-               slices = make(chan readerSlice)
+               slices = make(chan nextSlice)
 
                // Spin it off
                go readIntoBuffer(source_buffer, source_reader, slices)
@@ -150,17 +206,17 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                body = source_buffer[:]
 
                // buffer is complete
-               complete = true
+               reader_status = io.EOF
        }
 
-       pending_requests := make([]readRequest, 0)
+       pending_requests := make([]sliceRequest, 0)
 
        for {
                select {
                case req, valid := <-requests:
                        // Handle a buffer read request
                        if valid {
-                               if !handleReadRequest(req, body, complete) {
+                               if !handleReadRequest(req, body, reader_status) {
                                        pending_requests = append(pending_requests, req)
                                }
                        } else {
@@ -171,17 +227,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                case bk, valid := <-slices:
                        // Got a new slice from the reader
                        if valid {
-                               if bk.reader_error != nil {
-                                       reader_error <- bk.reader_error
-                                       if bk.reader_error == io.EOF {
-                                               // EOF indicates the reader is done
-                                               // sending, so our buffer is complete.
-                                               complete = true
-                                       } else {
-                                               // some other reader error
-                                               return
-                                       }
-                               }
+                               reader_status = bk.reader_error
 
                                if bk.slice != nil {
                                        // adjust body bounds now that another slice has been read
@@ -191,12 +237,9 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                                // handle pending reads
                                n := 0
                                for n < len(pending_requests) {
-                                       if handleReadRequest(pending_requests[n], body, complete) {
-
-                                               // move the element from the
-                                               // back of the slice to
-                                               // position 'n', then shorten
-                                               // the slice by one element
+                                       if handleReadRequest(pending_requests[n], body, reader_status) {
+                                               // move the element from the back of the slice to
+                                               // position 'n', then shorten the slice by one element
                                                pending_requests[n] = pending_requests[len(pending_requests)-1]
                                                pending_requests = pending_requests[0 : len(pending_requests)-1]
                                        } else {
@@ -206,14 +249,13 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                                        }
                                }
                        } else {
-                               if complete {
-                                       // no more reads
-                                       slices = nil
+                               if reader_status == io.EOF {
+                                       // no more reads expected, so this is ok
                                } else {
-                                       // reader channel closed without signaling EOF
-                                       reader_error <- io.ErrUnexpectedEOF
-                                       return
+                                       // slices channel closed without signaling EOF
+                                       reader_status = io.ErrUnexpectedEOF
                                }
+                               slices = nil
                        }
                }
        }