6af1dd0fa73711377bee65d01220cc3a59cb8cb2
[arvados.git] / sdk / go / src / arvados.org / buffer / buffer.go
1 package buffer
2
3 import (
4         "io"
5         "log"
6 )
7
8 type ReaderSlice struct {
9         slice        []byte
10         reader_error error
11 }
12
13 // Read repeatedly from the reader into the specified buffer, and report each
14 // read to channel 'c'.  Completes when Reader 'r' reports on the error channel
15 // and closes channel 'c'.
16 func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
17         defer close(slices)
18
19         // Initially use entire buffer as scratch space
20         ptr := buffer[:]
21         for {
22                 var n int
23                 var err error
24                 if len(ptr) > 0 {
25                         // Read into the scratch space
26                         n, err = r.Read(ptr)
27                 } else {
28                         // Ran out of scratch space, try reading one more byte
29                         var b [1]byte
30                         n, err = r.Read(b[:])
31
32                         if n > 0 {
33                                 // Reader has more data but we have nowhere to
34                                 // put it, so we're stuffed
35                                 slices <- ReaderSlice{nil, io.ErrShortBuffer}
36                         } else {
37                                 // Return some other error (hopefully EOF)
38                                 slices <- ReaderSlice{nil, err}
39                         }
40                         return
41                 }
42
43                 // End on error (includes EOF)
44                 if err != nil {
45                         slices <- ReaderSlice{nil, err}
46                         return
47                 }
48
49                 if n > 0 {
50                         // Make a slice with the contents of the read
51                         slices <- ReaderSlice{ptr[:n], nil}
52
53                         // Adjust the scratch space slice
54                         ptr = ptr[n:]
55                 }
56         }
57 }
58
59 // A read request to the Transfer() function
60 type ReadRequest struct {
61         offset  int
62         maxsize int
63         result  chan<- ReadResult
64 }
65
66 // A read result from the Transfer() function
67 type ReadResult struct {
68         slice []byte
69         err   error
70 }
71
72 // Reads from the buffer managed by the Transfer()
73 type BufferReader struct {
74         offset    *int
75         requests  chan<- ReadRequest
76         responses chan ReadResult
77 }
78
79 func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
80         return BufferReader{new(int), requests, make(chan ReadResult)}
81 }
82
83 // Reads from the buffer managed by the Transfer()
84 func (this BufferReader) Read(p []byte) (n int, err error) {
85         this.requests <- ReadRequest{*this.offset, len(p), this.responses}
86         rr, valid := <-this.responses
87         if valid {
88                 *this.offset += len(rr.slice)
89                 return copy(p, rr.slice), rr.err
90         } else {
91                 return 0, io.ErrUnexpectedEOF
92         }
93 }
94
95 func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
96         // Record starting offset in order to correctly report the number of bytes sent
97         starting_offset := *this.offset
98         for {
99                 this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
100                 rr, valid := <-this.responses
101                 if valid {
102                         log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
103                         *this.offset += len(rr.slice)
104                         if rr.err != nil {
105                                 if rr.err == io.EOF {
106                                         // EOF is not an error.
107                                         return int64(*this.offset - starting_offset), nil
108                                 } else {
109                                         return int64(*this.offset - starting_offset), rr.err
110                                 }
111                         } else {
112                                 dest.Write(rr.slice)
113                         }
114                 } else {
115                         return int64(*this.offset), io.ErrUnexpectedEOF
116                 }
117         }
118 }
119
120 // Close the responses channel
121 func (this BufferReader) Close() error {
122         close(this.responses)
123         return nil
124 }
125
126 // Handle a read request.  Returns true if a response was sent, and false if
127 // the request should be queued.
128 func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
129         log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
130         if req.offset < len(body) {
131                 var end int
132                 if req.offset+req.maxsize < len(body) {
133                         end = req.offset + req.maxsize
134                 } else {
135                         end = len(body)
136                 }
137                 req.result <- ReadResult{body[req.offset:end], nil}
138                 return true
139         } else if complete && req.offset >= len(body) {
140                 req.result <- ReadResult{nil, io.EOF}
141                 return true
142         } else {
143                 return false
144         }
145 }
146
147 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
148 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
149 // Accepts read requests on the buffer on the 'requests' channel.  Completes
150 // when 'requests' channel is closed.
151 func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
152         // currently buffered data
153         var body []byte
154
155         // for receiving slices from ReadIntoBuffer
156         var slices chan ReaderSlice = nil
157
158         // indicates whether the buffered data is complete
159         var complete bool = false
160
161         if source_reader != nil {
162                 // 'body' is the buffer slice representing the body content read so far
163                 body = source_buffer[:0]
164
165                 // used to communicate slices of the buffer as they are
166                 // ReadIntoBuffer will close 'slices' when it is done with it
167                 slices = make(chan ReaderSlice)
168
169                 // Spin it off
170                 go ReadIntoBuffer(source_buffer, source_reader, slices)
171         } else {
172                 // use the whole buffer
173                 body = source_buffer[:]
174
175                 // buffer is complete
176                 complete = true
177         }
178
179         pending_requests := make([]ReadRequest, 0)
180
181         for {
182                 select {
183                 case req, valid := <-requests:
184                         // Handle a buffer read request
185                         if valid {
186                                 if !HandleReadRequest(req, body, complete) {
187                                         pending_requests = append(pending_requests, req)
188                                 }
189                         } else {
190                                 // closed 'requests' channel indicates we're done
191                                 return
192                         }
193
194                 case bk, valid := <-slices:
195                         // Got a new slice from the reader
196                         if valid {
197                                 if bk.reader_error != nil {
198                                         reader_error <- bk.reader_error
199                                         if bk.reader_error == io.EOF {
200                                                 // EOF indicates the reader is done
201                                                 // sending, so our buffer is complete.
202                                                 complete = true
203                                         } else {
204                                                 // some other reader error
205                                                 return
206                                         }
207                                 }
208
209                                 if bk.slice != nil {
210                                         // adjust body bounds now that another slice has been read
211                                         body = source_buffer[0 : len(body)+len(bk.slice)]
212                                 }
213
214                                 // handle pending reads
215                                 n := 0
216                                 for n < len(pending_requests) {
217                                         if HandleReadRequest(pending_requests[n], body, complete) {
218
219                                                 // move the element from the
220                                                 // back of the slice to
221                                                 // position 'n', then shorten
222                                                 // the slice by one element
223                                                 pending_requests[n] = pending_requests[len(pending_requests)-1]
224                                                 pending_requests = pending_requests[0 : len(pending_requests)-1]
225                                         } else {
226
227                                                 // Request wasn't handled, so keep it in the request slice
228                                                 n += 1
229                                         }
230                                 }
231                         } else {
232                                 if complete {
233                                         // no more reads
234                                         slices = nil
235                                 } else {
236                                         // reader channel closed without signaling EOF
237                                         reader_error <- io.ErrUnexpectedEOF
238                                         return
239                                 }
240                         }
241                 }
242         }
243 }