From: Peter Amstutz Date: Wed, 21 May 2014 15:00:21 +0000 (-0400) Subject: 2798: Renamed internal messaging structs in an attempt to use the word "reader" X-Git-Tag: 1.1.0~2603^2~8^2~3 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/38cef39fbdfeb8176e4c755d12e43a450e868439 2798: Renamed internal messaging structs in an attempt to use the word "reader" slightly less. Refactored tests to reduce redundancy slightly. Added test with large number of concurrent readers. Rewrote "how to use" package comments and wrote a small novel about the "theory of operation". --- diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go index ba49fb341a..78ab027829 100644 --- a/sdk/go/src/arvados.org/streamer/streamer.go +++ b/sdk/go/src/arvados.org/streamer/streamer.go @@ -1,23 +1,35 @@ -/* Implements a buffer that supports concurrent incremental read and append. -New readers start reading from the beginning of the buffer, block when reaching -the end of the buffer, and are unblocked as new data is added. +/* AsyncStream pulls data in from a io.Reader source (such as a file or network +socket) and fans out to any number of StreamReader sinks. + +Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at +any point in the lifetime of the AsyncStream, and each StreamReader will read +the contents of the buffer up to the "frontier" of the buffer, at which point +the StreamReader blocks until new data is read from the source. + +This is useful for minimizing readthrough latency as sinks can read and act on +data from the source without waiting for the source to be completely buffered. +It is also useful as a cache in situations where re-reading the original source +potentially is costly, since the buffer retains a copy of the source data. Usage: Begin reading into a buffer with maximum size 'buffersize' from 'source': - tr := StartTransferFromReader(buffersize, source) + stream := AsyncStreamFromReader(buffersize, source) -To create a new reader (this can be called multiple times): - r := tr.MakeStreamReader() +To create a new reader (this can be called multiple times, each reader starts +at the beginning of the buffer): + reader := tr.MakeStreamReader() -When you're done with the buffer: - tr.Close() +Make sure to close the reader when you're done with it. + reader.Close() +When you're done with the stream: + stream.Close() Alternately, if you already have a filled buffer and just want to read out from it: - tr := StartTransferFromSlice(buf) + stream := AsyncStreamFromSlice(buf) + r := tr.MakeStreamReader() - tr.Close() */ @@ -28,35 +40,33 @@ import ( ) type AsyncStream struct { - requests chan readRequest + buffer []byte + requests chan sliceRequest add_reader chan bool subtract_reader chan bool wait_zero_readers chan bool - Reader_status chan error } // Reads from the buffer managed by the Transfer() type StreamReader struct { offset int stream *AsyncStream - responses chan readResult + responses chan sliceResult } func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - buf := make([]byte, buffersize) - - t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)} + t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} - go transfer(buf, source, t.requests, t.Reader_status) + go t.transfer(source) go t.readersMonitor() return t } func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil} + t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} - go transfer(buf, nil, t.requests, nil) + go t.transfer(nil) go t.readersMonitor() return t @@ -64,12 +74,12 @@ func AsyncStreamFromSlice(buf []byte) *AsyncStream { func (this *AsyncStream) MakeStreamReader() *StreamReader { this.add_reader <- true - return &StreamReader{0, this, make(chan readResult)} + return &StreamReader{0, this, make(chan sliceResult)} } // Reads from the buffer managed by the Transfer() func (this *StreamReader) Read(p []byte) (n int, err error) { - this.stream.requests <- readRequest{this.offset, len(p), this.responses} + this.stream.requests <- sliceRequest{this.offset, len(p), this.responses} rr, valid := <-this.responses if valid { this.offset += len(rr.slice) @@ -83,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { // Record starting offset in order to correctly report the number of bytes sent starting_offset := this.offset for { - this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses} + this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses} rr, valid := <-this.responses if valid { this.offset += len(rr.slice) @@ -116,7 +126,4 @@ func (this *AsyncStream) Close() { close(this.add_reader) close(this.subtract_reader) close(this.wait_zero_readers) - if this.Reader_status != nil { - close(this.Reader_status) - } } diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go index 33f84b8096..853d7d3035 100644 --- a/sdk/go/src/arvados.org/streamer/streamer_test.go +++ b/sdk/go/src/arvados.org/streamer/streamer_test.go @@ -15,130 +15,110 @@ var _ = Suite(&StandaloneSuite{}) // Standalone tests type StandaloneSuite struct{} +func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { + ReadIntoBufferHelper(c, 225) + ReadIntoBufferHelper(c, 224) +} + +func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { + out := make([]byte, 128) + for i := 0; i < 128; i += 1 { + out[i] = byte(i) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 128) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 128; i += 1 { + c.Check(s1.slice[i], Equals, byte(i)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } +} + +func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { + out := make([]byte, 96) + for i := 0; i < 96; i += 1 { + out[i] = byte(i / 2) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 96) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 96; i += 1 { + c.Check(s1.slice[i], Equals, byte(i/2)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else if i < (128 + 96) { + c.Check(buffer[i], Equals, byte((i-128)/2)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } +} + func ReadIntoBufferHelper(c *C, bufsize int) { buffer := make([]byte, bufsize) reader, writer := io.Pipe() - slices := make(chan readerSlice) + slices := make(chan nextSlice) go readIntoBuffer(buffer, reader, slices) - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) - } -} + HelperWrite128andCheck(c, buffer, writer, slices) + HelperWrite96andCheck(c, buffer, writer, slices) -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 512) - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) + writer.Close() + s1 := <-slices + c.Check(len(s1.slice), Equals, 0) + c.Check(s1.reader_error, Equals, io.EOF) } func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { buffer := make([]byte, 223) reader, writer := io.Pipe() - slices := make(chan readerSlice) + slices := make(chan nextSlice) go readIntoBuffer(buffer, reader, slices) - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } + HelperWrite128andCheck(c, buffer, writer, slices) + + out := make([]byte, 96) + for i := 0; i < 96; i += 1 { + out[i] = byte(i / 2) } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices + // Write will deadlock because it can't write all the data, so + // spin it off to a goroutine + go writer.Write(out) + s1 := <-slices - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } + c.Check(len(s1.slice), Equals, 95) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 95; i += 1 { + c.Check(s1.slice[i], Equals, byte(i/2)) } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else if i < (128 + 95) { + c.Check(buffer[i], Equals, byte((i-128)/2)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } } + writer.Close() + s1 = <-slices + c.Check(len(s1.slice), Equals, 0) + c.Check(s1.reader_error, Equals, io.ErrShortBuffer) } func (s *StandaloneSuite) TestTransfer(c *C) { @@ -227,8 +207,6 @@ func (s *StandaloneSuite) TestTransfer(c *C) { { // Test closing the reader writer.Close() - status := <-tr.Reader_status - c.Check(status, Equals, io.EOF) in := make([]byte, 256) n1, err1 := br1.Read(in) @@ -262,23 +240,21 @@ func (s *StandaloneSuite) TestTransfer(c *C) { func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { reader, writer := io.Pipe() - // Buffer for reads from 'r' - buffer := make([]byte, 100) - - // Read requests on Transfer() buffer - requests := make(chan readRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) + tr := AsyncStreamFromReader(100, reader) + defer tr.Close() - go transfer(buffer, reader, requests, reader_status) + sr := tr.MakeStreamReader() + defer sr.Close() out := make([]byte, 101) go writer.Write(out) - status := <-reader_status - c.Check(status, Equals, io.ErrShortBuffer) + n, err := sr.Read(out) + c.Check(n, Equals, 100) + + n, err = sr.Read(out) + c.Check(n, Equals, 0) + c.Check(err, Equals, io.ErrShortBuffer) } func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { @@ -346,3 +322,45 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) { io.Copy(writer, br1) } + +func (s *StandaloneSuite) TestManyReaders(c *C) { + reader, writer := io.Pipe() + + tr := AsyncStreamFromReader(512, reader) + defer tr.Close() + + sr := tr.MakeStreamReader() + go func() { + time.Sleep(100 * time.Millisecond) + sr.Close() + }() + + for i := 0; i < 200; i += 1 { + go func() { + br1 := tr.MakeStreamReader() + defer br1.Close() + + p := make([]byte, 3) + n, err := br1.Read(p) + c.Check(n, Equals, 3) + c.Check(p[0:3], DeepEquals, []byte("foo")) + + n, err = br1.Read(p) + c.Check(n, Equals, 3) + c.Check(p[0:3], DeepEquals, []byte("bar")) + + n, err = br1.Read(p) + c.Check(n, Equals, 3) + c.Check(p[0:3], DeepEquals, []byte("baz")) + + n, err = br1.Read(p) + c.Check(n, Equals, 0) + c.Check(err, Equals, io.EOF) + }() + } + + writer.Write([]byte("foo")) + writer.Write([]byte("bar")) + writer.Write([]byte("baz")) + writer.Close() +} diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go index 77242f13a6..a4a194f69b 100644 --- a/sdk/go/src/arvados.org/streamer/transfer.go +++ b/sdk/go/src/arvados.org/streamer/transfer.go @@ -1,3 +1,53 @@ +/* Internal implementation of AsyncStream. +Outline of operation: + +The kernel is the transfer() goroutine. It manages concurrent reads and +appends to the "body" slice. "body" is a slice of "source_buffer" that +represents the segment of the buffer that is already filled in and available +for reading. + +To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read +from the io.Reader source directly into source_buffer. Each read goes into a +slice of buffer which spans the section immediately following the end of the +current "body". Each time a Read completes, a slice representing the the +section just filled in (or any read errors/EOF) is sent over the "slices" +channel back to the transfer() function. + +Meanwhile, the transfer() function selects() on two channels, the "requests" +channel and the "slices" channel. + +When a message is recieved on the "slices" channel, this means the a new +section of the buffer has data, or an error is signaled. Since the data has +been read directly into the source_buffer, it is able to simply increases the +size of the body slice to encompass the newly filled in section. Then any +pending reads are serviced with handleReadRequest (described below). + +When a message is recieved on the "requests" channel, it means a StreamReader +wants access to a slice of the buffer. This is passed to handleReadRequest(). + +The handleReadRequest() function takes a sliceRequest consisting of a buffer +offset, maximum size, and channel to send the response. If there was an error +reported from the source reader, it is returned. If the offset is less than +the size of the body, the request can proceed, and it sends a body slice +spanning the segment from offset to min(offset+maxsize, end of the body). If +source reader status is EOF (done filling the buffer) and the read request +offset is beyond end of the body, it responds with EOF. Otherwise, the read +request is for a slice beyond the current size of "body" but we expect the body +to expand as more data is added, so the request gets added to a wait list. + +The transfer() runs until the requests channel is closed by AsyncStream.Close() + +To track readers, streamer uses the readersMonitor() goroutine. This goroutine +chooses which channels to receive from based on the number of outstanding +readers. When a new reader is created, it sends a message on the add_reader +channel. If the number of readers is already at MAX_READERS, this blocks the +sender until an existing reader is closed. When a reader is closed, it sends a +message on the subtract_reader channel. Finally, when AsyncStream.Close() is +called, it sends a message on the wait_zero_readers channel, which will block +the sender unless there are zero readers and it is safe to shut down the +AsyncStream. +*/ + package streamer import ( @@ -7,20 +57,20 @@ import ( const MAX_READERS = 100 // A slice passed from readIntoBuffer() to transfer() -type readerSlice struct { +type nextSlice struct { slice []byte reader_error error } // A read request to the Transfer() function -type readRequest struct { +type sliceRequest struct { offset int maxsize int - result chan<- readResult + result chan<- sliceResult } // A read result from the Transfer() function -type readResult struct { +type sliceResult struct { slice []byte err error } @@ -41,16 +91,16 @@ func (this *bufferWriter) Write(p []byte) (n int, err error) { // Read repeatedly from the reader and write sequentially into the specified // buffer, and report each read to channel 'c'. Completes when Reader 'r' // reports on the error channel and closes channel 'c'. -func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { +func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) { defer close(slices) if writeto, ok := r.(io.WriterTo); ok { n, err := writeto.WriteTo(&bufferWriter{buffer, 0}) if err != nil { - slices <- readerSlice{nil, err} + slices <- nextSlice{nil, err} } else { - slices <- readerSlice{buffer[:n], nil} - slices <- readerSlice{nil, io.EOF} + slices <- nextSlice{buffer[:n], nil} + slices <- nextSlice{nil, io.EOF} } return } else { @@ -75,23 +125,23 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { if n > 0 { // Reader has more data but we have nowhere to // put it, so we're stuffed - slices <- readerSlice{nil, io.ErrShortBuffer} + slices <- nextSlice{nil, io.ErrShortBuffer} } else { // Return some other error (hopefully EOF) - slices <- readerSlice{nil, err} + slices <- nextSlice{nil, err} } return } // End on error (includes EOF) if err != nil { - slices <- readerSlice{nil, err} + slices <- nextSlice{nil, err} return } if n > 0 { // Make a slice with the contents of the read - slices <- readerSlice{ptr[:n], nil} + slices <- nextSlice{ptr[:n], nil} // Adjust the scratch space slice ptr = ptr[n:] @@ -102,18 +152,21 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { // Handle a read request. Returns true if a response was sent, and false if // the request should be queued. -func handleReadRequest(req readRequest, body []byte, complete bool) bool { - if req.offset < len(body) { +func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool { + if (reader_status != nil) && (reader_status != io.EOF) { + req.result <- sliceResult{nil, reader_status} + return true + } else if req.offset < len(body) { var end int if req.offset+req.maxsize < len(body) { end = req.offset + req.maxsize } else { end = len(body) } - req.result <- readResult{body[req.offset:end], nil} + req.result <- sliceResult{body[req.offset:end], nil} return true - } else if complete && req.offset >= len(body) { - req.result <- readResult{nil, io.EOF} + } else if (reader_status == io.EOF) && (req.offset >= len(body)) { + req.result <- sliceResult{nil, io.EOF} return true } else { return false @@ -125,15 +178,18 @@ func handleReadRequest(req readRequest, body []byte, complete bool) bool { // in the provided buffer. Otherwise, use the contents of 'buffer' as is. // Accepts read requests on the buffer on the 'requests' channel. Completes // when 'requests' channel is closed. -func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) { +func (this *AsyncStream) transfer(source_reader io.Reader) { + source_buffer := this.buffer + requests := this.requests + // currently buffered data var body []byte // for receiving slices from readIntoBuffer - var slices chan readerSlice = nil + var slices chan nextSlice = nil - // indicates whether the buffered data is complete - var complete bool = false + // indicates the status of the underlying reader + var reader_status error = nil if source_reader != nil { // 'body' is the buffer slice representing the body content read so far @@ -141,7 +197,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea // used to communicate slices of the buffer as they are // readIntoBuffer will close 'slices' when it is done with it - slices = make(chan readerSlice) + slices = make(chan nextSlice) // Spin it off go readIntoBuffer(source_buffer, source_reader, slices) @@ -150,17 +206,17 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea body = source_buffer[:] // buffer is complete - complete = true + reader_status = io.EOF } - pending_requests := make([]readRequest, 0) + pending_requests := make([]sliceRequest, 0) for { select { case req, valid := <-requests: // Handle a buffer read request if valid { - if !handleReadRequest(req, body, complete) { + if !handleReadRequest(req, body, reader_status) { pending_requests = append(pending_requests, req) } } else { @@ -171,17 +227,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea case bk, valid := <-slices: // Got a new slice from the reader if valid { - if bk.reader_error != nil { - reader_error <- bk.reader_error - if bk.reader_error == io.EOF { - // EOF indicates the reader is done - // sending, so our buffer is complete. - complete = true - } else { - // some other reader error - return - } - } + reader_status = bk.reader_error if bk.slice != nil { // adjust body bounds now that another slice has been read @@ -191,12 +237,9 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea // handle pending reads n := 0 for n < len(pending_requests) { - if handleReadRequest(pending_requests[n], body, complete) { - - // move the element from the - // back of the slice to - // position 'n', then shorten - // the slice by one element + if handleReadRequest(pending_requests[n], body, reader_status) { + // move the element from the back of the slice to + // position 'n', then shorten the slice by one element pending_requests[n] = pending_requests[len(pending_requests)-1] pending_requests = pending_requests[0 : len(pending_requests)-1] } else { @@ -206,14 +249,13 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea } } } else { - if complete { - // no more reads - slices = nil + if reader_status == io.EOF { + // no more reads expected, so this is ok } else { - // reader channel closed without signaling EOF - reader_error <- io.ErrUnexpectedEOF - return + // slices channel closed without signaling EOF + reader_status = io.ErrUnexpectedEOF } + slices = nil } } }