X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3986815ae5e7e61c48f3ed979c32358710ef7e20..b28565c8aa08cbf70762fa69e49c5067fcb57e96:/sdk/go/src/arvados.org/streamer/streamer.go?ds=sidebyside diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go index 0b8755fe40..2217dd3352 100644 --- a/sdk/go/src/arvados.org/streamer/streamer.go +++ b/sdk/go/src/arvados.org/streamer/streamer.go @@ -1,23 +1,35 @@ -/* Implements a buffer that supports concurrent incremental read and append. -New readers start reading from the beginning of the buffer, block when reaching -the end of the buffer, and are unblocked as new data is added. +/* AsyncStream pulls data in from a io.Reader source (such as a file or network +socket) and fans out to any number of StreamReader sinks. + +Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at +any point in the lifetime of the AsyncStream, and each StreamReader will read +the contents of the buffer up to the "frontier" of the buffer, at which point +the StreamReader blocks until new data is read from the source. + +This is useful for minimizing readthrough latency as sinks can read and act on +data from the source without waiting for the source to be completely buffered. +It is also useful as a cache in situations where re-reading the original source +potentially is costly, since the buffer retains a copy of the source data. Usage: Begin reading into a buffer with maximum size 'buffersize' from 'source': - tr := StartTransferFromReader(buffersize, source) + stream := AsyncStreamFromReader(buffersize, source) -To create a new reader (this can be called multiple times): - r := tr.MakeStreamReader() +To create a new reader (this can be called multiple times, each reader starts +at the beginning of the buffer): + reader := tr.MakeStreamReader() -When you're done with the buffer: - tr.Close() +Make sure to close the reader when you're done with it. + reader.Close() +When you're done with the stream: + stream.Close() Alternately, if you already have a filled buffer and just want to read out from it: - tr := StartTransferFromSlice(buf) + stream := AsyncStreamFromSlice(buf) + r := tr.MakeStreamReader() - tr.Close() */ @@ -28,42 +40,46 @@ import ( ) type AsyncStream struct { - requests chan readRequest - Reader_status chan error + buffer []byte + requests chan sliceRequest + add_reader chan bool + subtract_reader chan bool + wait_zero_readers chan bool } // Reads from the buffer managed by the Transfer() type StreamReader struct { offset int - requests chan<- readRequest - responses chan readResult + stream *AsyncStream + responses chan sliceResult } func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - buf := make([]byte, buffersize) - - t := &AsyncStream{make(chan readRequest), make(chan error)} + t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} - go transfer(buf, source, t.requests, t.Reader_status) + go t.transfer(source) + go t.readersMonitor() return t } func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{make(chan readRequest), nil} + t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} - go transfer(buf, nil, t.requests, nil) + go t.transfer(nil) + go t.readersMonitor() return t } func (this *AsyncStream) MakeStreamReader() *StreamReader { - return &StreamReader{0, this.requests, make(chan readResult)} + this.add_reader <- true + return &StreamReader{0, this, make(chan sliceResult)} } // Reads from the buffer managed by the Transfer() func (this *StreamReader) Read(p []byte) (n int, err error) { - this.requests <- readRequest{this.offset, len(p), this.responses} + this.stream.requests <- sliceRequest{this.offset, len(p), this.responses} rr, valid := <-this.responses if valid { this.offset += len(rr.slice) @@ -77,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { // Record starting offset in order to correctly report the number of bytes sent starting_offset := this.offset for { - this.requests <- readRequest{this.offset, 32 * 1024, this.responses} + this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses} rr, valid := <-this.responses if valid { this.offset += len(rr.slice) @@ -99,13 +115,16 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { // Close the responses channel func (this *StreamReader) Close() error { + this.stream.subtract_reader <- true close(this.responses) + this.stream = nil return nil } func (this *AsyncStream) Close() { + this.wait_zero_readers <- true close(this.requests) - if this.Reader_status != nil { - close(this.Reader_status) - } + close(this.add_reader) + close(this.subtract_reader) + close(this.wait_zero_readers) }