/* AsyncStream pulls data in from a io.Reader source (such as a file or network
socket) and fans out to any number of StreamReader sinks.

Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
any point in the lifetime of the AsyncStream, and each StreamReader will read
the contents of the buffer up to the "frontier" of the buffer, at which point
the StreamReader blocks until new data is read from the source.

This is useful for minimizing readthrough latency as sinks can read and act on
data from the source without waiting for the source to be completely buffered.
It is also useful as a cache in situations where re-reading the original source
potentially is costly, since the buffer retains a copy of the source data.

Usage:

Begin reading into a buffer with maximum size 'buffersize' from 'source':
  stream := AsyncStreamFromReader(buffersize, source)

To create a new reader (this can be called multiple times, each reader starts
at the beginning of the buffer):
  reader := tr.MakeStreamReader()

Make sure to close the reader when you're done with it.
  reader.Close()

When you're done with the stream:
  stream.Close()

Alternately, if you already have a filled buffer and just want to read out from it:
  stream := AsyncStreamFromSlice(buf)

  r := tr.MakeStreamReader()

*/

package streamer

import (
	"io"
)

type AsyncStream struct {
	buffer            []byte
	requests          chan sliceRequest
	add_reader        chan bool
	subtract_reader   chan bool
	wait_zero_readers chan bool
}

// Reads from the buffer managed by the Transfer()
type StreamReader struct {
	offset    int
	stream    *AsyncStream
	responses chan sliceResult
}

func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
	t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}

	go t.transfer(source)
	go t.readersMonitor()

	return t
}

func AsyncStreamFromSlice(buf []byte) *AsyncStream {
	t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}

	go t.transfer(nil)
	go t.readersMonitor()

	return t
}

func (this *AsyncStream) MakeStreamReader() *StreamReader {
	this.add_reader <- true
	return &StreamReader{0, this, make(chan sliceResult)}
}

// Reads from the buffer managed by the Transfer()
func (this *StreamReader) Read(p []byte) (n int, err error) {
	this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
	rr, valid := <-this.responses
	if valid {
		this.offset += len(rr.slice)
		return copy(p, rr.slice), rr.err
	} else {
		return 0, io.ErrUnexpectedEOF
	}
}

func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
	// Record starting offset in order to correctly report the number of bytes sent
	starting_offset := this.offset
	for {
		this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
		rr, valid := <-this.responses
		if valid {
			this.offset += len(rr.slice)
			if rr.err != nil {
				if rr.err == io.EOF {
					// EOF is not an error.
					return int64(this.offset - starting_offset), nil
				} else {
					return int64(this.offset - starting_offset), rr.err
				}
			} else {
				dest.Write(rr.slice)
			}
		} else {
			return int64(this.offset), io.ErrUnexpectedEOF
		}
	}
}

// Close the responses channel
func (this *StreamReader) Close() error {
	this.stream.subtract_reader <- true
	close(this.responses)
	this.stream = nil
	return nil
}

func (this *AsyncStream) Close() {
	this.wait_zero_readers <- true
	close(this.requests)
	close(this.add_reader)
	close(this.subtract_reader)
	close(this.wait_zero_readers)
}