X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a2273675f29a0f85cb80e62b3742e82d63c365e9..b646cec74484bf07a54f4be2de712f50dc387aa0:/sdk/go/src/arvados.org/streamer/transfer.go diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go index ab8f941af7..a4a194f69b 100644 --- a/sdk/go/src/arvados.org/streamer/transfer.go +++ b/sdk/go/src/arvados.org/streamer/transfer.go @@ -1,25 +1,76 @@ +/* Internal implementation of AsyncStream. +Outline of operation: + +The kernel is the transfer() goroutine. It manages concurrent reads and +appends to the "body" slice. "body" is a slice of "source_buffer" that +represents the segment of the buffer that is already filled in and available +for reading. + +To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read +from the io.Reader source directly into source_buffer. Each read goes into a +slice of buffer which spans the section immediately following the end of the +current "body". Each time a Read completes, a slice representing the the +section just filled in (or any read errors/EOF) is sent over the "slices" +channel back to the transfer() function. + +Meanwhile, the transfer() function selects() on two channels, the "requests" +channel and the "slices" channel. + +When a message is recieved on the "slices" channel, this means the a new +section of the buffer has data, or an error is signaled. Since the data has +been read directly into the source_buffer, it is able to simply increases the +size of the body slice to encompass the newly filled in section. Then any +pending reads are serviced with handleReadRequest (described below). + +When a message is recieved on the "requests" channel, it means a StreamReader +wants access to a slice of the buffer. This is passed to handleReadRequest(). + +The handleReadRequest() function takes a sliceRequest consisting of a buffer +offset, maximum size, and channel to send the response. If there was an error +reported from the source reader, it is returned. If the offset is less than +the size of the body, the request can proceed, and it sends a body slice +spanning the segment from offset to min(offset+maxsize, end of the body). If +source reader status is EOF (done filling the buffer) and the read request +offset is beyond end of the body, it responds with EOF. Otherwise, the read +request is for a slice beyond the current size of "body" but we expect the body +to expand as more data is added, so the request gets added to a wait list. + +The transfer() runs until the requests channel is closed by AsyncStream.Close() + +To track readers, streamer uses the readersMonitor() goroutine. This goroutine +chooses which channels to receive from based on the number of outstanding +readers. When a new reader is created, it sends a message on the add_reader +channel. If the number of readers is already at MAX_READERS, this blocks the +sender until an existing reader is closed. When a reader is closed, it sends a +message on the subtract_reader channel. Finally, when AsyncStream.Close() is +called, it sends a message on the wait_zero_readers channel, which will block +the sender unless there are zero readers and it is safe to shut down the +AsyncStream. +*/ + package streamer import ( "io" - "log" ) +const MAX_READERS = 100 + // A slice passed from readIntoBuffer() to transfer() -type readerSlice struct { +type nextSlice struct { slice []byte reader_error error } // A read request to the Transfer() function -type readRequest struct { +type sliceRequest struct { offset int maxsize int - result chan<- readResult + result chan<- sliceResult } // A read result from the Transfer() function -type readResult struct { +type sliceResult struct { slice []byte err error } @@ -40,16 +91,16 @@ func (this *bufferWriter) Write(p []byte) (n int, err error) { // Read repeatedly from the reader and write sequentially into the specified // buffer, and report each read to channel 'c'. Completes when Reader 'r' // reports on the error channel and closes channel 'c'. -func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { +func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) { defer close(slices) if writeto, ok := r.(io.WriterTo); ok { n, err := writeto.WriteTo(&bufferWriter{buffer, 0}) if err != nil { - slices <- readerSlice{nil, err} + slices <- nextSlice{nil, err} } else { - slices <- readerSlice{buffer[:n], nil} - slices <- readerSlice{nil, io.EOF} + slices <- nextSlice{buffer[:n], nil} + slices <- nextSlice{nil, io.EOF} } return } else { @@ -74,23 +125,23 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { if n > 0 { // Reader has more data but we have nowhere to // put it, so we're stuffed - slices <- readerSlice{nil, io.ErrShortBuffer} + slices <- nextSlice{nil, io.ErrShortBuffer} } else { // Return some other error (hopefully EOF) - slices <- readerSlice{nil, err} + slices <- nextSlice{nil, err} } return } // End on error (includes EOF) if err != nil { - slices <- readerSlice{nil, err} + slices <- nextSlice{nil, err} return } if n > 0 { // Make a slice with the contents of the read - slices <- readerSlice{ptr[:n], nil} + slices <- nextSlice{ptr[:n], nil} // Adjust the scratch space slice ptr = ptr[n:] @@ -101,19 +152,21 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { // Handle a read request. Returns true if a response was sent, and false if // the request should be queued. -func handleReadRequest(req readRequest, body []byte, complete bool) bool { - log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body)) - if req.offset < len(body) { +func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool { + if (reader_status != nil) && (reader_status != io.EOF) { + req.result <- sliceResult{nil, reader_status} + return true + } else if req.offset < len(body) { var end int if req.offset+req.maxsize < len(body) { end = req.offset + req.maxsize } else { end = len(body) } - req.result <- readResult{body[req.offset:end], nil} + req.result <- sliceResult{body[req.offset:end], nil} return true - } else if complete && req.offset >= len(body) { - req.result <- readResult{nil, io.EOF} + } else if (reader_status == io.EOF) && (req.offset >= len(body)) { + req.result <- sliceResult{nil, io.EOF} return true } else { return false @@ -125,15 +178,18 @@ func handleReadRequest(req readRequest, body []byte, complete bool) bool { // in the provided buffer. Otherwise, use the contents of 'buffer' as is. // Accepts read requests on the buffer on the 'requests' channel. Completes // when 'requests' channel is closed. -func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) { +func (this *AsyncStream) transfer(source_reader io.Reader) { + source_buffer := this.buffer + requests := this.requests + // currently buffered data var body []byte // for receiving slices from readIntoBuffer - var slices chan readerSlice = nil + var slices chan nextSlice = nil - // indicates whether the buffered data is complete - var complete bool = false + // indicates the status of the underlying reader + var reader_status error = nil if source_reader != nil { // 'body' is the buffer slice representing the body content read so far @@ -141,7 +197,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea // used to communicate slices of the buffer as they are // readIntoBuffer will close 'slices' when it is done with it - slices = make(chan readerSlice) + slices = make(chan nextSlice) // Spin it off go readIntoBuffer(source_buffer, source_reader, slices) @@ -150,17 +206,17 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea body = source_buffer[:] // buffer is complete - complete = true + reader_status = io.EOF } - pending_requests := make([]readRequest, 0) + pending_requests := make([]sliceRequest, 0) for { select { case req, valid := <-requests: // Handle a buffer read request if valid { - if !handleReadRequest(req, body, complete) { + if !handleReadRequest(req, body, reader_status) { pending_requests = append(pending_requests, req) } } else { @@ -171,17 +227,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea case bk, valid := <-slices: // Got a new slice from the reader if valid { - if bk.reader_error != nil { - reader_error <- bk.reader_error - if bk.reader_error == io.EOF { - // EOF indicates the reader is done - // sending, so our buffer is complete. - complete = true - } else { - // some other reader error - return - } - } + reader_status = bk.reader_error if bk.slice != nil { // adjust body bounds now that another slice has been read @@ -191,12 +237,9 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea // handle pending reads n := 0 for n < len(pending_requests) { - if handleReadRequest(pending_requests[n], body, complete) { - - // move the element from the - // back of the slice to - // position 'n', then shorten - // the slice by one element + if handleReadRequest(pending_requests[n], body, reader_status) { + // move the element from the back of the slice to + // position 'n', then shorten the slice by one element pending_requests[n] = pending_requests[len(pending_requests)-1] pending_requests = pending_requests[0 : len(pending_requests)-1] } else { @@ -206,14 +249,59 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea } } } else { - if complete { - // no more reads - slices = nil + if reader_status == io.EOF { + // no more reads expected, so this is ok + } else { + // slices channel closed without signaling EOF + reader_status = io.ErrUnexpectedEOF + } + slices = nil + } + } + } +} + +func (this *AsyncStream) readersMonitor() { + var readers int = 0 + + for { + if readers == 0 { + select { + case _, ok := <-this.wait_zero_readers: + if ok { + // nothing, just implicitly unblock the sender } else { - // reader channel closed without signaling EOF - reader_error <- io.ErrUnexpectedEOF return } + case _, ok := <-this.add_reader: + if ok { + readers += 1 + } else { + return + } + } + } else if readers > 0 && readers < MAX_READERS { + select { + case _, ok := <-this.add_reader: + if ok { + readers += 1 + } else { + return + } + + case _, ok := <-this.subtract_reader: + if ok { + readers -= 1 + } else { + return + } + } + } else if readers == MAX_READERS { + _, ok := <-this.subtract_reader + if ok { + readers -= 1 + } else { + return } } }