Merge branch '10025-arvbox-layers' closes #10025
[arvados.git] / sdk / go / streamer / streamer.go
1 /* AsyncStream pulls data in from a io.Reader source (such as a file or network
2 socket) and fans out to any number of StreamReader sinks.
3
4 Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
5 any point in the lifetime of the AsyncStream, and each StreamReader will read
6 the contents of the buffer up to the "frontier" of the buffer, at which point
7 the StreamReader blocks until new data is read from the source.
8
9 This is useful for minimizing readthrough latency as sinks can read and act on
10 data from the source without waiting for the source to be completely buffered.
11 It is also useful as a cache in situations where re-reading the original source
12 potentially is costly, since the buffer retains a copy of the source data.
13
14 Usage:
15
16 Begin reading into a buffer with maximum size 'buffersize' from 'source':
17   stream := AsyncStreamFromReader(buffersize, source)
18
19 To create a new reader (this can be called multiple times, each reader starts
20 at the beginning of the buffer):
21   reader := tr.MakeStreamReader()
22
23 Make sure to close the reader when you're done with it.
24   reader.Close()
25
26 When you're done with the stream:
27   stream.Close()
28
29 Alternately, if you already have a filled buffer and just want to read out from it:
30   stream := AsyncStreamFromSlice(buf)
31
32   r := tr.MakeStreamReader()
33
34 */
35
36 package streamer
37
38 import (
39         "errors"
40         "io"
41 )
42
43 var ErrAlreadyClosed = errors.New("cannot close a stream twice")
44
45 type AsyncStream struct {
46         buffer            []byte
47         requests          chan sliceRequest
48         add_reader        chan bool
49         subtract_reader   chan bool
50         wait_zero_readers chan bool
51         closed            bool
52 }
53
54 // Reads from the buffer managed by the Transfer()
55 type StreamReader struct {
56         offset    int
57         stream    *AsyncStream
58         responses chan sliceResult
59 }
60
61 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
62         t := &AsyncStream{
63                 buffer:            make([]byte, buffersize),
64                 requests:          make(chan sliceRequest),
65                 add_reader:        make(chan bool),
66                 subtract_reader:   make(chan bool),
67                 wait_zero_readers: make(chan bool),
68         }
69
70         go t.transfer(source)
71         go t.readersMonitor()
72
73         return t
74 }
75
76 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
77         t := &AsyncStream{
78                 buffer:            buf,
79                 requests:          make(chan sliceRequest),
80                 add_reader:        make(chan bool),
81                 subtract_reader:   make(chan bool),
82                 wait_zero_readers: make(chan bool),
83         }
84
85         go t.transfer(nil)
86         go t.readersMonitor()
87
88         return t
89 }
90
91 func (this *AsyncStream) MakeStreamReader() *StreamReader {
92         this.add_reader <- true
93         return &StreamReader{0, this, make(chan sliceResult)}
94 }
95
96 // Reads from the buffer managed by the Transfer()
97 func (this *StreamReader) Read(p []byte) (n int, err error) {
98         this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
99         rr, valid := <-this.responses
100         if valid {
101                 this.offset += len(rr.slice)
102                 return copy(p, rr.slice), rr.err
103         } else {
104                 return 0, io.ErrUnexpectedEOF
105         }
106 }
107
108 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
109         // Record starting offset in order to correctly report the number of bytes sent
110         starting_offset := this.offset
111         for {
112                 this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
113                 rr, valid := <-this.responses
114                 if valid {
115                         this.offset += len(rr.slice)
116                         if rr.err != nil {
117                                 if rr.err == io.EOF {
118                                         // EOF is not an error.
119                                         return int64(this.offset - starting_offset), nil
120                                 } else {
121                                         return int64(this.offset - starting_offset), rr.err
122                                 }
123                         } else {
124                                 dest.Write(rr.slice)
125                         }
126                 } else {
127                         return int64(this.offset), io.ErrUnexpectedEOF
128                 }
129         }
130 }
131
132 // Close the responses channel
133 func (this *StreamReader) Close() error {
134         if this.stream == nil {
135                 return ErrAlreadyClosed
136         }
137         this.stream.subtract_reader <- true
138         close(this.responses)
139         this.stream = nil
140         return nil
141 }
142
143 func (this *AsyncStream) Close() error {
144         if this.closed {
145                 return ErrAlreadyClosed
146         }
147         this.closed = true
148         this.wait_zero_readers <- true
149         close(this.requests)
150         close(this.add_reader)
151         close(this.subtract_reader)
152         close(this.wait_zero_readers)
153         return nil
154 }