1885: Added logging of invalid requests. Added logging when the server list is
[arvados.git] / sdk / go / src / arvados.org / streamer / streamer.go
index ba49fb341a9e0a923039a38f64d1a5ed32fc040d..2217dd3352eae69255b74b4faa5a74425efca0ee 100644 (file)
@@ -1,23 +1,35 @@
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
 
 Usage:
 
 Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  tr := StartTransferFromReader(buffersize, source)
+  stream := AsyncStreamFromReader(buffersize, source)
 
-To create a new reader (this can be called multiple times):
-  r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+  reader := tr.MakeStreamReader()
 
-When you're done with the buffer:
-  tr.Close()
+Make sure to close the reader when you're done with it.
+  reader.Close()
 
+When you're done with the stream:
+  stream.Close()
 
 Alternately, if you already have a filled buffer and just want to read out from it:
-  tr := StartTransferFromSlice(buf)
+  stream := AsyncStreamFromSlice(buf)
+
   r := tr.MakeStreamReader()
-  tr.Close()
 
 */
 
@@ -28,35 +40,33 @@ import (
 )
 
 type AsyncStream struct {
-       requests          chan readRequest
+       buffer            []byte
+       requests          chan sliceRequest
        add_reader        chan bool
        subtract_reader   chan bool
        wait_zero_readers chan bool
-       Reader_status     chan error
 }
 
 // Reads from the buffer managed by the Transfer()
 type StreamReader struct {
        offset    int
        stream    *AsyncStream
-       responses chan readResult
+       responses chan sliceResult
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       buf := make([]byte, buffersize)
-
-       t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
+       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, source, t.requests, t.Reader_status)
+       go t.transfer(source)
        go t.readersMonitor()
 
        return t
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
+       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, nil, t.requests, nil)
+       go t.transfer(nil)
        go t.readersMonitor()
 
        return t
@@ -64,12 +74,12 @@ func AsyncStreamFromSlice(buf []byte) *AsyncStream {
 
 func (this *AsyncStream) MakeStreamReader() *StreamReader {
        this.add_reader <- true
-       return &StreamReader{0, this, make(chan readResult)}
+       return &StreamReader{0, this, make(chan sliceResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
 func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.stream.requests <- readRequest{this.offset, len(p), this.responses}
+       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
        rr, valid := <-this.responses
        if valid {
                this.offset += len(rr.slice)
@@ -83,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
        // Record starting offset in order to correctly report the number of bytes sent
        starting_offset := this.offset
        for {
-               this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
                rr, valid := <-this.responses
                if valid {
                        this.offset += len(rr.slice)
@@ -107,6 +117,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 func (this *StreamReader) Close() error {
        this.stream.subtract_reader <- true
        close(this.responses)
+       this.stream = nil
        return nil
 }
 
@@ -116,7 +127,4 @@ func (this *AsyncStream) Close() {
        close(this.add_reader)
        close(this.subtract_reader)
        close(this.wait_zero_readers)
-       if this.Reader_status != nil {
-               close(this.Reader_status)
-       }
 }