package streamer
import (
+ "errors"
+ "fmt"
"io"
)
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
type AsyncStream struct {
buffer []byte
requests chan sliceRequest
add_reader chan bool
subtract_reader chan bool
wait_zero_readers chan bool
+ closed bool
}
// Reads from the buffer managed by the Transfer()
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+ t := &AsyncStream{
+ buffer: make([]byte, buffersize),
+ requests: make(chan sliceRequest),
+ add_reader: make(chan bool),
+ subtract_reader: make(chan bool),
+ wait_zero_readers: make(chan bool),
+ }
go t.transfer(source)
go t.readersMonitor()
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+ t := &AsyncStream{
+ buffer: buf,
+ requests: make(chan sliceRequest),
+ add_reader: make(chan bool),
+ subtract_reader: make(chan bool),
+ wait_zero_readers: make(chan bool),
+ }
go t.transfer(nil)
go t.readersMonitor()
// Close the responses channel
func (this *StreamReader) Close() error {
+ if this.stream == nil {
+ return ErrAlreadyClosed
+ }
this.stream.subtract_reader <- true
close(this.responses)
this.stream = nil
return nil
}
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+ if this.closed {
+ return ErrAlreadyClosed
+ }
+ this.closed = true
this.wait_zero_readers <- true
close(this.requests)
close(this.add_reader)
close(this.subtract_reader)
close(this.wait_zero_readers)
+ return nil
+}
+
+func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
+ var want int64
+ switch whence {
+ case io.SeekStart:
+ want = offset
+ case io.SeekCurrent:
+ want = int64(this.offset) + offset
+ case io.SeekEnd:
+ want = int64(this.Len()) + offset
+ default:
+ return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
+ }
+ if want < 0 {
+ return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
+ }
+ if want > int64(this.Len()) {
+ want = int64(this.Len())
+ }
+ this.offset = int(want)
+ return want, nil
+}
+
+func (this *StreamReader) Len() uint64 {
+ return uint64(len(this.stream.buffer))
}