Changes to allow datamanager to run indefinitely:
[arvados.git] / sdk / go / streamer / streamer.go
1 /* AsyncStream pulls data in from a io.Reader source (such as a file or network
2 socket) and fans out to any number of StreamReader sinks.
3
4 Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
5 any point in the lifetime of the AsyncStream, and each StreamReader will read
6 the contents of the buffer up to the "frontier" of the buffer, at which point
7 the StreamReader blocks until new data is read from the source.
8
9 This is useful for minimizing readthrough latency as sinks can read and act on
10 data from the source without waiting for the source to be completely buffered.
11 It is also useful as a cache in situations where re-reading the original source
12 potentially is costly, since the buffer retains a copy of the source data.
13
14 Usage:
15
16 Begin reading into a buffer with maximum size 'buffersize' from 'source':
17   stream := AsyncStreamFromReader(buffersize, source)
18
19 To create a new reader (this can be called multiple times, each reader starts
20 at the beginning of the buffer):
21   reader := tr.MakeStreamReader()
22
23 Make sure to close the reader when you're done with it.
24   reader.Close()
25
26 When you're done with the stream:
27   stream.Close()
28
29 Alternately, if you already have a filled buffer and just want to read out from it:
30   stream := AsyncStreamFromSlice(buf)
31
32   r := tr.MakeStreamReader()
33
34 */
35
36 package streamer
37
38 import (
39         "io"
40 )
41
42 type AsyncStream struct {
43         buffer            []byte
44         requests          chan sliceRequest
45         add_reader        chan bool
46         subtract_reader   chan bool
47         wait_zero_readers chan bool
48 }
49
50 // Reads from the buffer managed by the Transfer()
51 type StreamReader struct {
52         offset    int
53         stream    *AsyncStream
54         responses chan sliceResult
55 }
56
57 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
58         t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
59
60         go t.transfer(source)
61         go t.readersMonitor()
62
63         return t
64 }
65
66 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
67         t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
68
69         go t.transfer(nil)
70         go t.readersMonitor()
71
72         return t
73 }
74
75 func (this *AsyncStream) MakeStreamReader() *StreamReader {
76         this.add_reader <- true
77         return &StreamReader{0, this, make(chan sliceResult)}
78 }
79
80 // Reads from the buffer managed by the Transfer()
81 func (this *StreamReader) Read(p []byte) (n int, err error) {
82         this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
83         rr, valid := <-this.responses
84         if valid {
85                 this.offset += len(rr.slice)
86                 return copy(p, rr.slice), rr.err
87         } else {
88                 return 0, io.ErrUnexpectedEOF
89         }
90 }
91
92 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
93         // Record starting offset in order to correctly report the number of bytes sent
94         starting_offset := this.offset
95         for {
96                 this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
97                 rr, valid := <-this.responses
98                 if valid {
99                         this.offset += len(rr.slice)
100                         if rr.err != nil {
101                                 if rr.err == io.EOF {
102                                         // EOF is not an error.
103                                         return int64(this.offset - starting_offset), nil
104                                 } else {
105                                         return int64(this.offset - starting_offset), rr.err
106                                 }
107                         } else {
108                                 dest.Write(rr.slice)
109                         }
110                 } else {
111                         return int64(this.offset), io.ErrUnexpectedEOF
112                 }
113         }
114 }
115
116 // Close the responses channel
117 func (this *StreamReader) Close() error {
118         this.stream.subtract_reader <- true
119         close(this.responses)
120         this.stream = nil
121         return nil
122 }
123
124 func (this *AsyncStream) Close() {
125         this.wait_zero_readers <- true
126         close(this.requests)
127         close(this.add_reader)
128         close(this.subtract_reader)
129         close(this.wait_zero_readers)
130 }