1885: Added logging of invalid requests. Added logging when the server list is
[arvados.git] / sdk / go / src / arvados.org / streamer / streamer.go
index 12475aa370e713e6e7ad1671598fd1c3bb4cab06..2217dd3352eae69255b74b4faa5a74425efca0ee 100644 (file)
@@ -1,23 +1,35 @@
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
 
 Usage:
 
 Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  tr := StartTransferFromReader(buffersize, source)
+  stream := AsyncStreamFromReader(buffersize, source)
 
-To create a new reader (this can be called multiple times):
-  r := tr.MakeBufferReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+  reader := tr.MakeStreamReader()
 
-When you're done with the buffer:
-  tr.Close()
+Make sure to close the reader when you're done with it.
+  reader.Close()
 
+When you're done with the stream:
+  stream.Close()
 
 Alternately, if you already have a filled buffer and just want to read out from it:
-  tr := StartTransferFromSlice(buf)
-  r := tr.MakeBufferReader()
-  tr.Close()
+  stream := AsyncStreamFromSlice(buf)
+
+  r := tr.MakeStreamReader()
 
 */
 
@@ -25,89 +37,94 @@ package streamer
 
 import (
        "io"
-       "log"
 )
 
-type TransferBuffer struct {
-       requests      chan readRequest
-       Reader_status chan error
+type AsyncStream struct {
+       buffer            []byte
+       requests          chan sliceRequest
+       add_reader        chan bool
+       subtract_reader   chan bool
+       wait_zero_readers chan bool
 }
 
 // Reads from the buffer managed by the Transfer()
-type BufferReader struct {
-       offset    *int
-       requests  chan<- readRequest
-       responses chan readResult
+type StreamReader struct {
+       offset    int
+       stream    *AsyncStream
+       responses chan sliceResult
 }
 
-func StartTransferFromReader(buffersize int, source io.Reader) TransferBuffer {
-       buf := make([]byte, buffersize)
+func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
+       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       t := TransferBuffer{make(chan readRequest), make(chan error)}
-
-       go transfer(buf, source, t.requests, t.Reader_status)
+       go t.transfer(source)
+       go t.readersMonitor()
 
        return t
 }
 
-func StartTransferFromSlice(buf []byte) TransferBuffer {
-       t := TransferBuffer{make(chan readRequest), nil}
+func AsyncStreamFromSlice(buf []byte) *AsyncStream {
+       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-       go transfer(buf, nil, t.requests, nil)
+       go t.transfer(nil)
+       go t.readersMonitor()
 
        return t
 }
 
-func (this TransferBuffer) MakeBufferReader() BufferReader {
-       return BufferReader{new(int), this.requests, make(chan readResult)}
+func (this *AsyncStream) MakeStreamReader() *StreamReader {
+       this.add_reader <- true
+       return &StreamReader{0, this, make(chan sliceResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
-func (this BufferReader) Read(p []byte) (n int, err error) {
-       this.requests <- readRequest{*this.offset, len(p), this.responses}
+func (this *StreamReader) Read(p []byte) (n int, err error) {
+       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
        rr, valid := <-this.responses
        if valid {
-               *this.offset += len(rr.slice)
+               this.offset += len(rr.slice)
                return copy(p, rr.slice), rr.err
        } else {
                return 0, io.ErrUnexpectedEOF
        }
 }
 
-func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
        // Record starting offset in order to correctly report the number of bytes sent
-       starting_offset := *this.offset
+       starting_offset := this.offset
        for {
-               this.requests <- readRequest{*this.offset, 32 * 1024, this.responses}
+               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
                rr, valid := <-this.responses
                if valid {
-                       log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
-                       *this.offset += len(rr.slice)
+                       this.offset += len(rr.slice)
                        if rr.err != nil {
                                if rr.err == io.EOF {
                                        // EOF is not an error.
-                                       return int64(*this.offset - starting_offset), nil
+                                       return int64(this.offset - starting_offset), nil
                                } else {
-                                       return int64(*this.offset - starting_offset), rr.err
+                                       return int64(this.offset - starting_offset), rr.err
                                }
                        } else {
                                dest.Write(rr.slice)
                        }
                } else {
-                       return int64(*this.offset), io.ErrUnexpectedEOF
+                       return int64(this.offset), io.ErrUnexpectedEOF
                }
        }
 }
 
 // Close the responses channel
-func (this BufferReader) Close() error {
+func (this *StreamReader) Close() error {
+       this.stream.subtract_reader <- true
        close(this.responses)
+       this.stream = nil
        return nil
 }
 
-func (this TransferBuffer) Close() {
+func (this *AsyncStream) Close() {
+       this.wait_zero_readers <- true
        close(this.requests)
-       if this.Reader_status != nil {
-               close(this.Reader_status)
-       }
+       close(this.add_reader)
+       close(this.subtract_reader)
+       close(this.wait_zero_readers)
 }