1 /* Implements a buffer that supports concurrent incremental read and append.
2 New readers start reading from the beginning of the buffer, block when reaching
3 the end of the buffer, and are unblocked as new data is added.
7 Begin reading into a buffer with maximum size 'buffersize' from 'source':
8 tr := StartTransferFromReader(buffersize, source)
10 To create a new reader (this can be called multiple times):
11 r := tr.MakeStreamReader()
13 When you're done with the buffer:
17 Alternately, if you already have a filled buffer and just want to read out from it:
18 tr := StartTransferFromSlice(buf)
19 r := tr.MakeStreamReader()
30 type AsyncStream struct {
31 requests chan readRequest
32 Reader_status chan error
35 // Reads from the buffer managed by the Transfer()
36 type StreamReader struct {
38 requests chan<- readRequest
39 responses chan readResult
42 func AsyncReaderStream(buffersize int, source io.Reader) *AsyncStream {
43 buf := make([]byte, buffersize)
45 t := &AsyncStream{make(chan readRequest), make(chan error)}
47 go transfer(buf, source, t.requests, t.Reader_status)
52 func AsyncSliceStream(buf []byte) *AsyncStream {
53 t := &AsyncStream{make(chan readRequest), nil}
55 go transfer(buf, nil, t.requests, nil)
60 func (this *AsyncStream) MakeStreamReader() *StreamReader {
61 return &StreamReader{0, this.requests, make(chan readResult)}
64 // Reads from the buffer managed by the Transfer()
65 func (this *StreamReader) Read(p []byte) (n int, err error) {
66 this.requests <- readRequest{this.offset, len(p), this.responses}
67 rr, valid := <-this.responses
69 this.offset += len(rr.slice)
70 return copy(p, rr.slice), rr.err
72 return 0, io.ErrUnexpectedEOF
76 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
77 // Record starting offset in order to correctly report the number of bytes sent
78 starting_offset := this.offset
80 this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
81 rr, valid := <-this.responses
83 this.offset += len(rr.slice)
86 // EOF is not an error.
87 return int64(this.offset - starting_offset), nil
89 return int64(this.offset - starting_offset), rr.err
95 return int64(this.offset), io.ErrUnexpectedEOF
100 // Close the responses channel
101 func (this *StreamReader) Close() error {
102 close(this.responses)
106 func (this *AsyncStream) Close() {
108 if this.Reader_status != nil {
109 close(this.Reader_status)