ab8f941af77760620dcf25e7bd6cacb09e1dd1f3
[arvados.git] / sdk / go / src / arvados.org / streamer / transfer.go
1 package streamer
2
3 import (
4         "io"
5         "log"
6 )
7
8 // A slice passed from readIntoBuffer() to transfer()
9 type readerSlice struct {
10         slice        []byte
11         reader_error error
12 }
13
14 // A read request to the Transfer() function
15 type readRequest struct {
16         offset  int
17         maxsize int
18         result  chan<- readResult
19 }
20
21 // A read result from the Transfer() function
22 type readResult struct {
23         slice []byte
24         err   error
25 }
26
27 // Supports writing into a buffer
28 type bufferWriter struct {
29         buf []byte
30         ptr int
31 }
32
33 // Copy p into this.buf, increment pointer and return number of bytes read.
34 func (this *bufferWriter) Write(p []byte) (n int, err error) {
35         n = copy(this.buf[this.ptr:], p)
36         this.ptr += n
37         return n, nil
38 }
39
40 // Read repeatedly from the reader and write sequentially into the specified
41 // buffer, and report each read to channel 'c'.  Completes when Reader 'r'
42 // reports on the error channel and closes channel 'c'.
43 func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
44         defer close(slices)
45
46         if writeto, ok := r.(io.WriterTo); ok {
47                 n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
48                 if err != nil {
49                         slices <- readerSlice{nil, err}
50                 } else {
51                         slices <- readerSlice{buffer[:n], nil}
52                         slices <- readerSlice{nil, io.EOF}
53                 }
54                 return
55         } else {
56                 // Initially entire buffer is available
57                 ptr := buffer[:]
58                 for {
59                         var n int
60                         var err error
61                         if len(ptr) > 0 {
62                                 const readblock = 64 * 1024
63                                 // Read 64KiB into the next part of the buffer
64                                 if len(ptr) > readblock {
65                                         n, err = r.Read(ptr[:readblock])
66                                 } else {
67                                         n, err = r.Read(ptr)
68                                 }
69                         } else {
70                                 // Ran out of buffer space, try reading one more byte
71                                 var b [1]byte
72                                 n, err = r.Read(b[:])
73
74                                 if n > 0 {
75                                         // Reader has more data but we have nowhere to
76                                         // put it, so we're stuffed
77                                         slices <- readerSlice{nil, io.ErrShortBuffer}
78                                 } else {
79                                         // Return some other error (hopefully EOF)
80                                         slices <- readerSlice{nil, err}
81                                 }
82                                 return
83                         }
84
85                         // End on error (includes EOF)
86                         if err != nil {
87                                 slices <- readerSlice{nil, err}
88                                 return
89                         }
90
91                         if n > 0 {
92                                 // Make a slice with the contents of the read
93                                 slices <- readerSlice{ptr[:n], nil}
94
95                                 // Adjust the scratch space slice
96                                 ptr = ptr[n:]
97                         }
98                 }
99         }
100 }
101
102 // Handle a read request.  Returns true if a response was sent, and false if
103 // the request should be queued.
104 func handleReadRequest(req readRequest, body []byte, complete bool) bool {
105         log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body))
106         if req.offset < len(body) {
107                 var end int
108                 if req.offset+req.maxsize < len(body) {
109                         end = req.offset + req.maxsize
110                 } else {
111                         end = len(body)
112                 }
113                 req.result <- readResult{body[req.offset:end], nil}
114                 return true
115         } else if complete && req.offset >= len(body) {
116                 req.result <- readResult{nil, io.EOF}
117                 return true
118         } else {
119                 return false
120         }
121 }
122
123 // Mediates between reads and appends.
124 // If 'source_reader' is not nil, reads data from 'source_reader' and stores it
125 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
126 // Accepts read requests on the buffer on the 'requests' channel.  Completes
127 // when 'requests' channel is closed.
128 func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
129         // currently buffered data
130         var body []byte
131
132         // for receiving slices from readIntoBuffer
133         var slices chan readerSlice = nil
134
135         // indicates whether the buffered data is complete
136         var complete bool = false
137
138         if source_reader != nil {
139                 // 'body' is the buffer slice representing the body content read so far
140                 body = source_buffer[:0]
141
142                 // used to communicate slices of the buffer as they are
143                 // readIntoBuffer will close 'slices' when it is done with it
144                 slices = make(chan readerSlice)
145
146                 // Spin it off
147                 go readIntoBuffer(source_buffer, source_reader, slices)
148         } else {
149                 // use the whole buffer
150                 body = source_buffer[:]
151
152                 // buffer is complete
153                 complete = true
154         }
155
156         pending_requests := make([]readRequest, 0)
157
158         for {
159                 select {
160                 case req, valid := <-requests:
161                         // Handle a buffer read request
162                         if valid {
163                                 if !handleReadRequest(req, body, complete) {
164                                         pending_requests = append(pending_requests, req)
165                                 }
166                         } else {
167                                 // closed 'requests' channel indicates we're done
168                                 return
169                         }
170
171                 case bk, valid := <-slices:
172                         // Got a new slice from the reader
173                         if valid {
174                                 if bk.reader_error != nil {
175                                         reader_error <- bk.reader_error
176                                         if bk.reader_error == io.EOF {
177                                                 // EOF indicates the reader is done
178                                                 // sending, so our buffer is complete.
179                                                 complete = true
180                                         } else {
181                                                 // some other reader error
182                                                 return
183                                         }
184                                 }
185
186                                 if bk.slice != nil {
187                                         // adjust body bounds now that another slice has been read
188                                         body = source_buffer[0 : len(body)+len(bk.slice)]
189                                 }
190
191                                 // handle pending reads
192                                 n := 0
193                                 for n < len(pending_requests) {
194                                         if handleReadRequest(pending_requests[n], body, complete) {
195
196                                                 // move the element from the
197                                                 // back of the slice to
198                                                 // position 'n', then shorten
199                                                 // the slice by one element
200                                                 pending_requests[n] = pending_requests[len(pending_requests)-1]
201                                                 pending_requests = pending_requests[0 : len(pending_requests)-1]
202                                         } else {
203
204                                                 // Request wasn't handled, so keep it in the request slice
205                                                 n += 1
206                                         }
207                                 }
208                         } else {
209                                 if complete {
210                                         // no more reads
211                                         slices = nil
212                                 } else {
213                                         // reader channel closed without signaling EOF
214                                         reader_error <- io.ErrUnexpectedEOF
215                                         return
216                                 }
217                         }
218                 }
219         }
220 }