2798: Renamed internal messaging structs in an attempt to use the word "reader"
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 15:00:21 +0000 (11:00 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 15:00:21 +0000 (11:00 -0400)
slightly less.  Refactored tests to reduce redundancy slightly.  Added test with large number of concurrent readers.  Rewrote "how to use" package comments and wrote a small novel about the "theory of operation".

sdk/go/src/arvados.org/streamer/streamer.go
sdk/go/src/arvados.org/streamer/streamer_test.go
sdk/go/src/arvados.org/streamer/transfer.go

index ba49fb341a9e0a923039a38f64d1a5ed32fc040d..78ab027829b59c99b3e1d539dc16c482927f5231 100644 (file)
@@ -1,23 +1,35 @@
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
 
 Usage:
 
 Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  tr := StartTransferFromReader(buffersize, source)
+  stream := AsyncStreamFromReader(buffersize, source)
 
-To create a new reader (this can be called multiple times):
-  r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+  reader := tr.MakeStreamReader()
 
-When you're done with the buffer:
-  tr.Close()
+Make sure to close the reader when you're done with it.
+  reader.Close()
 
+When you're done with the stream:
+  stream.Close()
 
 Alternately, if you already have a filled buffer and just want to read out from it:
-  tr := StartTransferFromSlice(buf)
+  stream := AsyncStreamFromSlice(buf)
+
   r := tr.MakeStreamReader()
-  tr.Close()
 
 */
 
@@ -28,35 +40,33 @@ import (
 )
 
 type AsyncStream struct {
-       requests          chan readRequest
+       buffer            []byte
+       requests          chan sliceRequest
        add_reader        chan bool
        subtract_reader   chan bool
        wait_zero_readers chan bool
-       Reader_status     chan error
 }
 
 // Reads from the buffer managed by the Transfer()
 type StreamReader struct {
        offset    int
        stream    *AsyncStream
-       responses chan readResult
+       responses chan sliceResult
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       buf := make([]byte, buffersize)
-
-       t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
+       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, source, t.requests, t.Reader_status)
+       go t.transfer(source)
        go t.readersMonitor()
 
        return t
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
+       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, nil, t.requests, nil)
+       go t.transfer(nil)
        go t.readersMonitor()
 
        return t
@@ -64,12 +74,12 @@ func AsyncStreamFromSlice(buf []byte) *AsyncStream {
 
 func (this *AsyncStream) MakeStreamReader() *StreamReader {
        this.add_reader <- true
-       return &StreamReader{0, this, make(chan readResult)}
+       return &StreamReader{0, this, make(chan sliceResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
 func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.stream.requests <- readRequest{this.offset, len(p), this.responses}
+       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
        rr, valid := <-this.responses
        if valid {
                this.offset += len(rr.slice)
@@ -83,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
        // Record starting offset in order to correctly report the number of bytes sent
        starting_offset := this.offset
        for {
-               this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
                rr, valid := <-this.responses
                if valid {
                        this.offset += len(rr.slice)
@@ -116,7 +126,4 @@ func (this *AsyncStream) Close() {
        close(this.add_reader)
        close(this.subtract_reader)
        close(this.wait_zero_readers)
-       if this.Reader_status != nil {
-               close(this.Reader_status)
-       }
 }
index 33f84b809623f4e48c3f4144aa6dcc8b105af004..853d7d30354daddb86e602c4b580141fc18e1bdf 100644 (file)
@@ -15,130 +15,110 @@ var _ = Suite(&StandaloneSuite{})
 // Standalone tests
 type StandaloneSuite struct{}
 
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+       ReadIntoBufferHelper(c, 225)
+       ReadIntoBufferHelper(c, 224)
+}
+
+func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+       out := make([]byte, 128)
+       for i := 0; i < 128; i += 1 {
+               out[i] = byte(i)
+       }
+       writer.Write(out)
+       s1 := <-slices
+       c.Check(len(s1.slice), Equals, 128)
+       c.Check(s1.reader_error, Equals, nil)
+       for i := 0; i < 128; i += 1 {
+               c.Check(s1.slice[i], Equals, byte(i))
+       }
+       for i := 0; i < len(buffer); i += 1 {
+               if i < 128 {
+                       c.Check(buffer[i], Equals, byte(i))
+               } else {
+                       c.Check(buffer[i], Equals, byte(0))
+               }
+       }
+}
+
+func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+       out := make([]byte, 96)
+       for i := 0; i < 96; i += 1 {
+               out[i] = byte(i / 2)
+       }
+       writer.Write(out)
+       s1 := <-slices
+       c.Check(len(s1.slice), Equals, 96)
+       c.Check(s1.reader_error, Equals, nil)
+       for i := 0; i < 96; i += 1 {
+               c.Check(s1.slice[i], Equals, byte(i/2))
+       }
+       for i := 0; i < len(buffer); i += 1 {
+               if i < 128 {
+                       c.Check(buffer[i], Equals, byte(i))
+               } else if i < (128 + 96) {
+                       c.Check(buffer[i], Equals, byte((i-128)/2))
+               } else {
+                       c.Check(buffer[i], Equals, byte(0))
+               }
+       }
+}
+
 func ReadIntoBufferHelper(c *C, bufsize int) {
        buffer := make([]byte, bufsize)
 
        reader, writer := io.Pipe()
-       slices := make(chan readerSlice)
+       slices := make(chan nextSlice)
 
        go readIntoBuffer(buffer, reader, slices)
 
-       {
-               out := make([]byte, 128)
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-               writer.Write(out)
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 128)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 128; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
-       }
-       {
-               out := make([]byte, 96)
-               for i := 0; i < 96; i += 1 {
-                       out[i] = byte(i / 2)
-               }
-               writer.Write(out)
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 96)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 96; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i/2))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else if i < (128 + 96) {
-                               c.Check(buffer[i], Equals, byte((i-128)/2))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
-       }
-       {
-               writer.Close()
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 0)
-               c.Check(s1.reader_error, Equals, io.EOF)
-       }
-}
+       HelperWrite128andCheck(c, buffer, writer, slices)
+       HelperWrite96andCheck(c, buffer, writer, slices)
 
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-       ReadIntoBufferHelper(c, 512)
-       ReadIntoBufferHelper(c, 225)
-       ReadIntoBufferHelper(c, 224)
+       writer.Close()
+       s1 := <-slices
+       c.Check(len(s1.slice), Equals, 0)
+       c.Check(s1.reader_error, Equals, io.EOF)
 }
 
 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
        buffer := make([]byte, 223)
        reader, writer := io.Pipe()
-       slices := make(chan readerSlice)
+       slices := make(chan nextSlice)
 
        go readIntoBuffer(buffer, reader, slices)
 
-       {
-               out := make([]byte, 128)
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-               writer.Write(out)
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 128)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 128; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
+       HelperWrite128andCheck(c, buffer, writer, slices)
+
+       out := make([]byte, 96)
+       for i := 0; i < 96; i += 1 {
+               out[i] = byte(i / 2)
        }
-       {
-               out := make([]byte, 96)
-               for i := 0; i < 96; i += 1 {
-                       out[i] = byte(i / 2)
-               }
 
-               // Write will deadlock because it can't write all the data, so
-               // spin it off to a goroutine
-               go writer.Write(out)
-               s1 := <-slices
+       // Write will deadlock because it can't write all the data, so
+       // spin it off to a goroutine
+       go writer.Write(out)
+       s1 := <-slices
 
-               c.Check(len(s1.slice), Equals, 95)
-               c.Check(s1.reader_error, Equals, nil)
-               for i := 0; i < 95; i += 1 {
-                       c.Check(s1.slice[i], Equals, byte(i/2))
-               }
-               for i := 0; i < len(buffer); i += 1 {
-                       if i < 128 {
-                               c.Check(buffer[i], Equals, byte(i))
-                       } else if i < (128 + 95) {
-                               c.Check(buffer[i], Equals, byte((i-128)/2))
-                       } else {
-                               c.Check(buffer[i], Equals, byte(0))
-                       }
-               }
+       c.Check(len(s1.slice), Equals, 95)
+       c.Check(s1.reader_error, Equals, nil)
+       for i := 0; i < 95; i += 1 {
+               c.Check(s1.slice[i], Equals, byte(i/2))
        }
-       {
-               writer.Close()
-               s1 := <-slices
-               c.Check(len(s1.slice), Equals, 0)
-               c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+       for i := 0; i < len(buffer); i += 1 {
+               if i < 128 {
+                       c.Check(buffer[i], Equals, byte(i))
+               } else if i < (128 + 95) {
+                       c.Check(buffer[i], Equals, byte((i-128)/2))
+               } else {
+                       c.Check(buffer[i], Equals, byte(0))
+               }
        }
 
+       writer.Close()
+       s1 = <-slices
+       c.Check(len(s1.slice), Equals, 0)
+       c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
 }
 
 func (s *StandaloneSuite) TestTransfer(c *C) {
@@ -227,8 +207,6 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
        {
                // Test closing the reader
                writer.Close()
-               status := <-tr.Reader_status
-               c.Check(status, Equals, io.EOF)
 
                in := make([]byte, 256)
                n1, err1 := br1.Read(in)
@@ -262,23 +240,21 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
        reader, writer := io.Pipe()
 
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-
-       // Read requests on Transfer() buffer
-       requests := make(chan readRequest)
-       defer close(requests)
-
-       // Reporting reader error states
-       reader_status := make(chan error)
+       tr := AsyncStreamFromReader(100, reader)
+       defer tr.Close()
 
-       go transfer(buffer, reader, requests, reader_status)
+       sr := tr.MakeStreamReader()
+       defer sr.Close()
 
        out := make([]byte, 101)
        go writer.Write(out)
 
-       status := <-reader_status
-       c.Check(status, Equals, io.ErrShortBuffer)
+       n, err := sr.Read(out)
+       c.Check(n, Equals, 100)
+
+       n, err = sr.Read(out)
+       c.Check(n, Equals, 0)
+       c.Check(err, Equals, io.ErrShortBuffer)
 }
 
 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
@@ -346,3 +322,45 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
 
        io.Copy(writer, br1)
 }
+
+func (s *StandaloneSuite) TestManyReaders(c *C) {
+       reader, writer := io.Pipe()
+
+       tr := AsyncStreamFromReader(512, reader)
+       defer tr.Close()
+
+       sr := tr.MakeStreamReader()
+       go func() {
+               time.Sleep(100 * time.Millisecond)
+               sr.Close()
+       }()
+
+       for i := 0; i < 200; i += 1 {
+               go func() {
+                       br1 := tr.MakeStreamReader()
+                       defer br1.Close()
+
+                       p := make([]byte, 3)
+                       n, err := br1.Read(p)
+                       c.Check(n, Equals, 3)
+                       c.Check(p[0:3], DeepEquals, []byte("foo"))
+
+                       n, err = br1.Read(p)
+                       c.Check(n, Equals, 3)
+                       c.Check(p[0:3], DeepEquals, []byte("bar"))
+
+                       n, err = br1.Read(p)
+                       c.Check(n, Equals, 3)
+                       c.Check(p[0:3], DeepEquals, []byte("baz"))
+
+                       n, err = br1.Read(p)
+                       c.Check(n, Equals, 0)
+                       c.Check(err, Equals, io.EOF)
+               }()
+       }
+
+       writer.Write([]byte("foo"))
+       writer.Write([]byte("bar"))
+       writer.Write([]byte("baz"))
+       writer.Close()
+}
index 77242f13a6a7553de402c20741730c0674153686..a4a194f69bcc8fbdbf43650caa881325bde5dc24 100644 (file)
@@ -1,3 +1,53 @@
+/* Internal implementation of AsyncStream.
+Outline of operation:
+
+The kernel is the transfer() goroutine.  It manages concurrent reads and
+appends to the "body" slice.  "body" is a slice of "source_buffer" that
+represents the segment of the buffer that is already filled in and available
+for reading.
+
+To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
+from the io.Reader source directly into source_buffer.  Each read goes into a
+slice of buffer which spans the section immediately following the end of the
+current "body".  Each time a Read completes, a slice representing the the
+section just filled in (or any read errors/EOF) is sent over the "slices"
+channel back to the transfer() function.
+
+Meanwhile, the transfer() function selects() on two channels, the "requests"
+channel and the "slices" channel.
+
+When a message is recieved on the "slices" channel, this means the a new
+section of the buffer has data, or an error is signaled.  Since the data has
+been read directly into the source_buffer, it is able to simply increases the
+size of the body slice to encompass the newly filled in section.  Then any
+pending reads are serviced with handleReadRequest (described below).
+
+When a message is recieved on the "requests" channel, it means a StreamReader
+wants access to a slice of the buffer.  This is passed to handleReadRequest().
+
+The handleReadRequest() function takes a sliceRequest consisting of a buffer
+offset, maximum size, and channel to send the response.  If there was an error
+reported from the source reader, it is returned.  If the offset is less than
+the size of the body, the request can proceed, and it sends a body slice
+spanning the segment from offset to min(offset+maxsize, end of the body).  If
+source reader status is EOF (done filling the buffer) and the read request
+offset is beyond end of the body, it responds with EOF.  Otherwise, the read
+request is for a slice beyond the current size of "body" but we expect the body
+to expand as more data is added, so the request gets added to a wait list.
+
+The transfer() runs until the requests channel is closed by AsyncStream.Close()
+
+To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
+chooses which channels to receive from based on the number of outstanding
+readers.  When a new reader is created, it sends a message on the add_reader
+channel.  If the number of readers is already at MAX_READERS, this blocks the
+sender until an existing reader is closed.  When a reader is closed, it sends a
+message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
+called, it sends a message on the wait_zero_readers channel, which will block
+the sender unless there are zero readers and it is safe to shut down the
+AsyncStream.
+*/
+
 package streamer
 
 import (
@@ -7,20 +57,20 @@ import (
 const MAX_READERS = 100
 
 // A slice passed from readIntoBuffer() to transfer()
-type readerSlice struct {
+type nextSlice struct {
        slice        []byte
        reader_error error
 }
 
 // A read request to the Transfer() function
-type readRequest struct {
+type sliceRequest struct {
        offset  int
        maxsize int
-       result  chan<- readResult
+       result  chan<- sliceResult
 }
 
 // A read result from the Transfer() function
-type readResult struct {
+type sliceResult struct {
        slice []byte
        err   error
 }
@@ -41,16 +91,16 @@ func (this *bufferWriter) Write(p []byte) (n int, err error) {
 // Read repeatedly from the reader and write sequentially into the specified
 // buffer, and report each read to channel 'c'.  Completes when Reader 'r'
 // reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
+func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
        defer close(slices)
 
        if writeto, ok := r.(io.WriterTo); ok {
                n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
                if err != nil {
-                       slices <- readerSlice{nil, err}
+                       slices <- nextSlice{nil, err}
                } else {
-                       slices <- readerSlice{buffer[:n], nil}
-                       slices <- readerSlice{nil, io.EOF}
+                       slices <- nextSlice{buffer[:n], nil}
+                       slices <- nextSlice{nil, io.EOF}
                }
                return
        } else {
@@ -75,23 +125,23 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
                                if n > 0 {
                                        // Reader has more data but we have nowhere to
                                        // put it, so we're stuffed
-                                       slices <- readerSlice{nil, io.ErrShortBuffer}
+                                       slices <- nextSlice{nil, io.ErrShortBuffer}
                                } else {
                                        // Return some other error (hopefully EOF)
-                                       slices <- readerSlice{nil, err}
+                                       slices <- nextSlice{nil, err}
                                }
                                return
                        }
 
                        // End on error (includes EOF)
                        if err != nil {
-                               slices <- readerSlice{nil, err}
+                               slices <- nextSlice{nil, err}
                                return
                        }
 
                        if n > 0 {
                                // Make a slice with the contents of the read
-                               slices <- readerSlice{ptr[:n], nil}
+                               slices <- nextSlice{ptr[:n], nil}
 
                                // Adjust the scratch space slice
                                ptr = ptr[n:]
@@ -102,18 +152,21 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
 
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
-func handleReadRequest(req readRequest, body []byte, complete bool) bool {
-       if req.offset < len(body) {
+func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
+       if (reader_status != nil) && (reader_status != io.EOF) {
+               req.result <- sliceResult{nil, reader_status}
+               return true
+       } else if req.offset < len(body) {
                var end int
                if req.offset+req.maxsize < len(body) {
                        end = req.offset + req.maxsize
                } else {
                        end = len(body)
                }
-               req.result <- readResult{body[req.offset:end], nil}
+               req.result <- sliceResult{body[req.offset:end], nil}
                return true
-       } else if complete && req.offset >= len(body) {
-               req.result <- readResult{nil, io.EOF}
+       } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
+               req.result <- sliceResult{nil, io.EOF}
                return true
        } else {
                return false
@@ -125,15 +178,18 @@ func handleReadRequest(req readRequest, body []byte, complete bool) bool {
 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
 // Accepts read requests on the buffer on the 'requests' channel.  Completes
 // when 'requests' channel is closed.
-func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
+func (this *AsyncStream) transfer(source_reader io.Reader) {
+       source_buffer := this.buffer
+       requests := this.requests
+
        // currently buffered data
        var body []byte
 
        // for receiving slices from readIntoBuffer
-       var slices chan readerSlice = nil
+       var slices chan nextSlice = nil
 
-       // indicates whether the buffered data is complete
-       var complete bool = false
+       // indicates the status of the underlying reader
+       var reader_status error = nil
 
        if source_reader != nil {
                // 'body' is the buffer slice representing the body content read so far
@@ -141,7 +197,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 
                // used to communicate slices of the buffer as they are
                // readIntoBuffer will close 'slices' when it is done with it
-               slices = make(chan readerSlice)
+               slices = make(chan nextSlice)
 
                // Spin it off
                go readIntoBuffer(source_buffer, source_reader, slices)
@@ -150,17 +206,17 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                body = source_buffer[:]
 
                // buffer is complete
-               complete = true
+               reader_status = io.EOF
        }
 
-       pending_requests := make([]readRequest, 0)
+       pending_requests := make([]sliceRequest, 0)
 
        for {
                select {
                case req, valid := <-requests:
                        // Handle a buffer read request
                        if valid {
-                               if !handleReadRequest(req, body, complete) {
+                               if !handleReadRequest(req, body, reader_status) {
                                        pending_requests = append(pending_requests, req)
                                }
                        } else {
@@ -171,17 +227,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                case bk, valid := <-slices:
                        // Got a new slice from the reader
                        if valid {
-                               if bk.reader_error != nil {
-                                       reader_error <- bk.reader_error
-                                       if bk.reader_error == io.EOF {
-                                               // EOF indicates the reader is done
-                                               // sending, so our buffer is complete.
-                                               complete = true
-                                       } else {
-                                               // some other reader error
-                                               return
-                                       }
-                               }
+                               reader_status = bk.reader_error
 
                                if bk.slice != nil {
                                        // adjust body bounds now that another slice has been read
@@ -191,12 +237,9 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                                // handle pending reads
                                n := 0
                                for n < len(pending_requests) {
-                                       if handleReadRequest(pending_requests[n], body, complete) {
-
-                                               // move the element from the
-                                               // back of the slice to
-                                               // position 'n', then shorten
-                                               // the slice by one element
+                                       if handleReadRequest(pending_requests[n], body, reader_status) {
+                                               // move the element from the back of the slice to
+                                               // position 'n', then shorten the slice by one element
                                                pending_requests[n] = pending_requests[len(pending_requests)-1]
                                                pending_requests = pending_requests[0 : len(pending_requests)-1]
                                        } else {
@@ -206,14 +249,13 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                                        }
                                }
                        } else {
-                               if complete {
-                                       // no more reads
-                                       slices = nil
+                               if reader_status == io.EOF {
+                                       // no more reads expected, so this is ok
                                } else {
-                                       // reader channel closed without signaling EOF
-                                       reader_error <- io.ErrUnexpectedEOF
-                                       return
+                                       // slices channel closed without signaling EOF
+                                       reader_status = io.ErrUnexpectedEOF
                                }
+                               slices = nil
                        }
                }
        }