)
type AsyncStream struct {
- requests chan readRequest
- Reader_status chan error
+ requests chan readRequest
+ add_reader chan bool
+ subtract_reader chan bool
+ wait_zero_readers chan bool
+ Reader_status chan error
}
// Reads from the buffer managed by the Transfer()
type StreamReader struct {
offset int
- requests chan<- readRequest
+ stream *AsyncStream
responses chan readResult
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
buf := make([]byte, buffersize)
- t := &AsyncStream{make(chan readRequest), make(chan error)}
+ t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
go transfer(buf, source, t.requests, t.Reader_status)
+ go t.readersMonitor()
return t
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{make(chan readRequest), nil}
+ t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
go transfer(buf, nil, t.requests, nil)
+ go t.readersMonitor()
return t
}
func (this *AsyncStream) MakeStreamReader() *StreamReader {
- return &StreamReader{0, this.requests, make(chan readResult)}
+ this.add_reader <- true
+ return &StreamReader{0, this, make(chan readResult)}
}
// Reads from the buffer managed by the Transfer()
func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.requests <- readRequest{this.offset, len(p), this.responses}
+ this.stream.requests <- readRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
// Record starting offset in order to correctly report the number of bytes sent
starting_offset := this.offset
for {
- this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+ this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
// Close the responses channel
func (this *StreamReader) Close() error {
+ this.stream.subtract_reader <- true
close(this.responses)
return nil
}
func (this *AsyncStream) Close() {
+ this.wait_zero_readers <- true
close(this.requests)
+ close(this.add_reader)
+ close(this.subtract_reader)
+ close(this.wait_zero_readers)
if this.Reader_status != nil {
close(this.Reader_status)
}
import (
"io"
- "log"
)
+const MAX_READERS = 100
+
// A slice passed from readIntoBuffer() to transfer()
type readerSlice struct {
slice []byte
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
func handleReadRequest(req readRequest, body []byte, complete bool) bool {
- log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body))
if req.offset < len(body) {
var end int
if req.offset+req.maxsize < len(body) {
}
}
}
+
+func (this *AsyncStream) readersMonitor() {
+ var readers int = 0
+
+ for {
+ if readers == 0 {
+ select {
+ case _, ok := <-this.wait_zero_readers:
+ if ok {
+ // nothing, just implicitly unblock the sender
+ } else {
+ return
+ }
+ case _, ok := <-this.add_reader:
+ if ok {
+ readers += 1
+ } else {
+ return
+ }
+ }
+ } else if readers > 0 && readers < MAX_READERS {
+ select {
+ case _, ok := <-this.add_reader:
+ if ok {
+ readers += 1
+ } else {
+ return
+ }
+
+ case _, ok := <-this.subtract_reader:
+ if ok {
+ readers -= 1
+ } else {
+ return
+ }
+ }
+ } else if readers == MAX_READERS {
+ _, ok := <-this.subtract_reader
+ if ok {
+ readers -= 1
+ } else {
+ return
+ }
+ }
+ }
+}