2754: Merge branch '2754-easy-run-pipeline' refs #2754
[arvados.git] / sdk / go / src / arvados.org / streamer / streamer.go
index 0b8755fe4042070e1b3cac6b74f9adba272a6979..2217dd3352eae69255b74b4faa5a74425efca0ee 100644 (file)
@@ -1,23 +1,35 @@
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
 
 Usage:
 
 Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  tr := StartTransferFromReader(buffersize, source)
+  stream := AsyncStreamFromReader(buffersize, source)
 
-To create a new reader (this can be called multiple times):
-  r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+  reader := tr.MakeStreamReader()
 
-When you're done with the buffer:
-  tr.Close()
+Make sure to close the reader when you're done with it.
+  reader.Close()
 
+When you're done with the stream:
+  stream.Close()
 
 Alternately, if you already have a filled buffer and just want to read out from it:
-  tr := StartTransferFromSlice(buf)
+  stream := AsyncStreamFromSlice(buf)
+
   r := tr.MakeStreamReader()
-  tr.Close()
 
 */
 
@@ -28,42 +40,46 @@ import (
 )
 
 type AsyncStream struct {
-       requests      chan readRequest
-       Reader_status chan error
+       buffer            []byte
+       requests          chan sliceRequest
+       add_reader        chan bool
+       subtract_reader   chan bool
+       wait_zero_readers chan bool
 }
 
 // Reads from the buffer managed by the Transfer()
 type StreamReader struct {
        offset    int
-       requests  chan<- readRequest
-       responses chan readResult
+       stream    *AsyncStream
+       responses chan sliceResult
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       buf := make([]byte, buffersize)
-
-       t := &AsyncStream{make(chan readRequest), make(chan error)}
+       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, source, t.requests, t.Reader_status)
+       go t.transfer(source)
+       go t.readersMonitor()
 
        return t
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{make(chan readRequest), nil}
+       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, nil, t.requests, nil)
+       go t.transfer(nil)
+       go t.readersMonitor()
 
        return t
 }
 
 func (this *AsyncStream) MakeStreamReader() *StreamReader {
-       return &StreamReader{0, this.requests, make(chan readResult)}
+       this.add_reader <- true
+       return &StreamReader{0, this, make(chan sliceResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
 func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.requests <- readRequest{this.offset, len(p), this.responses}
+       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
        rr, valid := <-this.responses
        if valid {
                this.offset += len(rr.slice)
@@ -77,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
        // Record starting offset in order to correctly report the number of bytes sent
        starting_offset := this.offset
        for {
-               this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
                rr, valid := <-this.responses
                if valid {
                        this.offset += len(rr.slice)
@@ -99,13 +115,16 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+       this.stream.subtract_reader <- true
        close(this.responses)
+       this.stream = nil
        return nil
 }
 
 func (this *AsyncStream) Close() {
+       this.wait_zero_readers <- true
        close(this.requests)
-       if this.Reader_status != nil {
-               close(this.Reader_status)
-       }
+       close(this.add_reader)
+       close(this.subtract_reader)
+       close(this.wait_zero_readers)
 }