Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / streamer / streamer.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 /* AsyncStream pulls data in from a io.Reader source (such as a file or network
6 socket) and fans out to any number of StreamReader sinks.
7
8 Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
9 any point in the lifetime of the AsyncStream, and each StreamReader will read
10 the contents of the buffer up to the "frontier" of the buffer, at which point
11 the StreamReader blocks until new data is read from the source.
12
13 This is useful for minimizing readthrough latency as sinks can read and act on
14 data from the source without waiting for the source to be completely buffered.
15 It is also useful as a cache in situations where re-reading the original source
16 potentially is costly, since the buffer retains a copy of the source data.
17
18 Usage:
19
20 Begin reading into a buffer with maximum size 'buffersize' from 'source':
21   stream := AsyncStreamFromReader(buffersize, source)
22
23 To create a new reader (this can be called multiple times, each reader starts
24 at the beginning of the buffer):
25   reader := tr.MakeStreamReader()
26
27 Make sure to close the reader when you're done with it.
28   reader.Close()
29
30 When you're done with the stream:
31   stream.Close()
32
33 Alternately, if you already have a filled buffer and just want to read out from it:
34   stream := AsyncStreamFromSlice(buf)
35
36   r := tr.MakeStreamReader()
37
38 */
39
40 package streamer
41
42 import (
43         "errors"
44         "io"
45 )
46
47 var ErrAlreadyClosed = errors.New("cannot close a stream twice")
48
49 type AsyncStream struct {
50         buffer            []byte
51         requests          chan sliceRequest
52         add_reader        chan bool
53         subtract_reader   chan bool
54         wait_zero_readers chan bool
55         closed            bool
56 }
57
58 // Reads from the buffer managed by the Transfer()
59 type StreamReader struct {
60         offset    int
61         stream    *AsyncStream
62         responses chan sliceResult
63 }
64
65 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
66         t := &AsyncStream{
67                 buffer:            make([]byte, buffersize),
68                 requests:          make(chan sliceRequest),
69                 add_reader:        make(chan bool),
70                 subtract_reader:   make(chan bool),
71                 wait_zero_readers: make(chan bool),
72         }
73
74         go t.transfer(source)
75         go t.readersMonitor()
76
77         return t
78 }
79
80 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
81         t := &AsyncStream{
82                 buffer:            buf,
83                 requests:          make(chan sliceRequest),
84                 add_reader:        make(chan bool),
85                 subtract_reader:   make(chan bool),
86                 wait_zero_readers: make(chan bool),
87         }
88
89         go t.transfer(nil)
90         go t.readersMonitor()
91
92         return t
93 }
94
95 func (this *AsyncStream) MakeStreamReader() *StreamReader {
96         this.add_reader <- true
97         return &StreamReader{0, this, make(chan sliceResult)}
98 }
99
100 // Reads from the buffer managed by the Transfer()
101 func (this *StreamReader) Read(p []byte) (n int, err error) {
102         this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
103         rr, valid := <-this.responses
104         if valid {
105                 this.offset += len(rr.slice)
106                 return copy(p, rr.slice), rr.err
107         } else {
108                 return 0, io.ErrUnexpectedEOF
109         }
110 }
111
112 func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
113         // Record starting offset in order to correctly report the number of bytes sent
114         starting_offset := this.offset
115         for {
116                 this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
117                 rr, valid := <-this.responses
118                 if valid {
119                         this.offset += len(rr.slice)
120                         if rr.err != nil {
121                                 if rr.err == io.EOF {
122                                         // EOF is not an error.
123                                         return int64(this.offset - starting_offset), nil
124                                 } else {
125                                         return int64(this.offset - starting_offset), rr.err
126                                 }
127                         } else {
128                                 dest.Write(rr.slice)
129                         }
130                 } else {
131                         return int64(this.offset), io.ErrUnexpectedEOF
132                 }
133         }
134 }
135
136 // Close the responses channel
137 func (this *StreamReader) Close() error {
138         if this.stream == nil {
139                 return ErrAlreadyClosed
140         }
141         this.stream.subtract_reader <- true
142         close(this.responses)
143         this.stream = nil
144         return nil
145 }
146
147 func (this *AsyncStream) Close() error {
148         if this.closed {
149                 return ErrAlreadyClosed
150         }
151         this.closed = true
152         this.wait_zero_readers <- true
153         close(this.requests)
154         close(this.add_reader)
155         close(this.subtract_reader)
156         close(this.wait_zero_readers)
157         return nil
158 }