10990: Use AsyncStream to minimize store-and-forward latency
[arvados.git] / sdk / go / streamer / streamer.go
1 /* AsyncStream pulls data in from a io.Reader source (such as a file or network
2 socket) and fans out to any number of StreamReader sinks.
3
4 Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
5 any point in the lifetime of the AsyncStream, and each StreamReader will read
6 the contents of the buffer up to the "frontier" of the buffer, at which point
7 the StreamReader blocks until new data is read from the source.
8
9 This is useful for minimizing readthrough latency as sinks can read and act on
10 data from the source without waiting for the source to be completely buffered.
11 It is also useful as a cache in situations where re-reading the original source
12 potentially is costly, since the buffer retains a copy of the source data.
13
14 Usage:
15
16 Begin reading into a buffer with maximum size 'buffersize' from 'source':
17   stream := AsyncStreamFromReader(buffersize, source)
18
19 To create a new reader (this can be called multiple times, each reader starts
20 at the beginning of the buffer):
21   reader := tr.MakeStreamReader()
22
23 Make sure to close the reader when you're done with it.
24   reader.Close()
25
26 When you're done with the stream:
27   stream.Close()
28
29 Alternately, if you already have a filled buffer and just want to read out from it:
30   stream := AsyncStreamFromSlice(buf)
31
32   r := tr.MakeStreamReader()
33
34 */
35
36 package streamer
37
38 import (
39         "errors"
40         "fmt"
41         "io"
42 )
43
44 var ErrAlreadyClosed = errors.New("cannot close a stream twice")
45
46 type AsyncStream struct {
47         buffer            []byte
48         requests          chan sliceRequest
49         add_reader        chan bool
50         subtract_reader   chan bool
51         wait_zero_readers chan bool
52         closed            bool
53 }
54
55 // Reads from the buffer managed by the Transfer()
56 type StreamReader struct {
57         offset    int
58         stream    *AsyncStream
59         responses chan sliceResult
60 }
61
62 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
63         t := &AsyncStream{
64                 buffer:            make([]byte, buffersize),
65                 requests:          make(chan sliceRequest),
66                 add_reader:        make(chan bool),
67                 subtract_reader:   make(chan bool),
68                 wait_zero_readers: make(chan bool),
69         }
70
71         go t.transfer(source)
72         go t.readersMonitor()
73
74         return t
75 }
76
77 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
78         t := &AsyncStream{
79                 buffer:            buf,
80                 requests:          make(chan sliceRequest),
81                 add_reader:        make(chan bool),
82                 subtract_reader:   make(chan bool),
83                 wait_zero_readers: make(chan bool),
84         }
85
86         go t.transfer(nil)
87         go t.readersMonitor()
88
89         return t
90 }
91
92 func (this *AsyncStream) MakeStreamReader() *StreamReader {
93         this.add_reader <- true
94         return &StreamReader{0, this, make(chan sliceResult)}
95 }
96
97 // Reads from the buffer managed by the Transfer()
98 func (this *StreamReader) Read(p []byte) (n int, err error) {
99         this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
100         rr, valid := <-this.responses
101         if valid {
102                 this.offset += len(rr.slice)
103                 return copy(p, rr.slice), rr.err
104         } else {
105                 return 0, io.ErrUnexpectedEOF
106         }
107 }
108
109 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
110         // Record starting offset in order to correctly report the number of bytes sent
111         starting_offset := this.offset
112         for {
113                 this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
114                 rr, valid := <-this.responses
115                 if valid {
116                         this.offset += len(rr.slice)
117                         if rr.err != nil {
118                                 if rr.err == io.EOF {
119                                         // EOF is not an error.
120                                         return int64(this.offset - starting_offset), nil
121                                 } else {
122                                         return int64(this.offset - starting_offset), rr.err
123                                 }
124                         } else {
125                                 dest.Write(rr.slice)
126                         }
127                 } else {
128                         return int64(this.offset), io.ErrUnexpectedEOF
129                 }
130         }
131 }
132
133 // Close the responses channel
134 func (this *StreamReader) Close() error {
135         if this.stream == nil {
136                 return ErrAlreadyClosed
137         }
138         this.stream.subtract_reader <- true
139         close(this.responses)
140         this.stream = nil
141         return nil
142 }
143
144 func (this *AsyncStream) Close() error {
145         if this.closed {
146                 return ErrAlreadyClosed
147         }
148         this.closed = true
149         this.wait_zero_readers <- true
150         close(this.requests)
151         close(this.add_reader)
152         close(this.subtract_reader)
153         close(this.wait_zero_readers)
154         return nil
155 }
156
157 func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
158         var want int64
159         switch whence {
160         case io.SeekStart:
161                 want = offset
162         case io.SeekCurrent:
163                 want = int64(this.offset) + offset
164         case io.SeekEnd:
165                 want = int64(this.Len()) + offset
166         default:
167                 return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
168         }
169         if want < 0 {
170                 return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
171         }
172         if want > int64(this.Len()) {
173                 want = int64(this.Len())
174         }
175         this.offset = int(want)
176         return want, nil
177 }
178
179 func (this *StreamReader) Len() uint64 {
180         return uint64(len(this.stream.buffer))
181 }