1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 /* AsyncStream pulls data in from a io.Reader source (such as a file or network
6 socket) and fans out to any number of StreamReader sinks.
8 Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
9 any point in the lifetime of the AsyncStream, and each StreamReader will read
10 the contents of the buffer up to the "frontier" of the buffer, at which point
11 the StreamReader blocks until new data is read from the source.
13 This is useful for minimizing readthrough latency as sinks can read and act on
14 data from the source without waiting for the source to be completely buffered.
15 It is also useful as a cache in situations where re-reading the original source
16 potentially is costly, since the buffer retains a copy of the source data.
20 Begin reading into a buffer with maximum size 'buffersize' from 'source':
21 stream := AsyncStreamFromReader(buffersize, source)
23 To create a new reader (this can be called multiple times, each reader starts
24 at the beginning of the buffer):
25 reader := tr.MakeStreamReader()
27 Make sure to close the reader when you're done with it.
30 When you're done with the stream:
33 Alternately, if you already have a filled buffer and just want to read out from it:
34 stream := AsyncStreamFromSlice(buf)
36 r := tr.MakeStreamReader()
47 var ErrAlreadyClosed = errors.New("cannot close a stream twice")
49 type AsyncStream struct {
51 requests chan sliceRequest
53 subtract_reader chan bool
54 wait_zero_readers chan bool
58 // Reads from the buffer managed by the Transfer()
59 type StreamReader struct {
62 responses chan sliceResult
65 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
67 buffer: make([]byte, buffersize),
68 requests: make(chan sliceRequest),
69 add_reader: make(chan bool),
70 subtract_reader: make(chan bool),
71 wait_zero_readers: make(chan bool),
80 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
83 requests: make(chan sliceRequest),
84 add_reader: make(chan bool),
85 subtract_reader: make(chan bool),
86 wait_zero_readers: make(chan bool),
95 func (this *AsyncStream) MakeStreamReader() *StreamReader {
96 this.add_reader <- true
97 return &StreamReader{0, this, make(chan sliceResult)}
100 // Reads from the buffer managed by the Transfer()
101 func (this *StreamReader) Read(p []byte) (n int, err error) {
102 this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
103 rr, valid := <-this.responses
105 this.offset += len(rr.slice)
106 return copy(p, rr.slice), rr.err
108 return 0, io.ErrUnexpectedEOF
112 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
113 // Record starting offset in order to correctly report the number of bytes sent
114 starting_offset := this.offset
116 this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
117 rr, valid := <-this.responses
119 this.offset += len(rr.slice)
121 if rr.err == io.EOF {
122 // EOF is not an error.
123 return int64(this.offset - starting_offset), nil
125 return int64(this.offset - starting_offset), rr.err
131 return int64(this.offset), io.ErrUnexpectedEOF
136 // Close the responses channel
137 func (this *StreamReader) Close() error {
138 if this.stream == nil {
139 return ErrAlreadyClosed
141 this.stream.subtract_reader <- true
142 close(this.responses)
147 func (this *AsyncStream) Close() error {
149 return ErrAlreadyClosed
152 this.wait_zero_readers <- true
154 close(this.add_reader)
155 close(this.subtract_reader)
156 close(this.wait_zero_readers)