Merge branch 'master' into 5538-arvadosclient-retry
[arvados.git] / sdk / go / keepclient / collectionreader.go
1 package keepclient
2
3 import (
4         "errors"
5         "io"
6         "os"
7
8         "git.curoverse.com/arvados.git/sdk/go/manifest"
9 )
10
11 // ReadCloserWithLen extends io.ReadCloser with a Len() method that
12 // returns the total number of bytes available to read.
13 type ReadCloserWithLen interface {
14         io.ReadCloser
15         Len() uint64
16 }
17
18 const (
19         // After reading a data block from Keep, cfReader slices it up
20         // and sends the slices to a buffered channel to be consumed
21         // by the caller via Read().
22         //
23         // dataSliceSize is the maximum size of the slices, and
24         // therefore the maximum number of bytes that will be returned
25         // by a single call to Read().
26         dataSliceSize = 1 << 20
27 )
28
29 // ErrNoManifest indicates the given collection has no manifest
30 // information (e.g., manifest_text was excluded by a "select"
31 // parameter when retrieving the collection record).
32 var ErrNoManifest = errors.New("Collection has no manifest")
33
34 // CollectionFileReader returns a ReadCloserWithLen that reads file
35 // content from a collection. The filename must be given relative to
36 // the root of the collection, without a leading "./".
37 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
38         mText, ok := collection["manifest_text"].(string)
39         if !ok {
40                 return nil, ErrNoManifest
41         }
42         m := manifest.Manifest{Text: mText}
43         rdrChan := make(chan *cfReader)
44         go kc.queueSegmentsToGet(m, filename, rdrChan)
45         r, ok := <-rdrChan
46         if !ok {
47                 return nil, os.ErrNotExist
48         }
49         return r, nil
50 }
51
52 // Send segments for the specified file to r.toGet. Send a *cfReader
53 // to rdrChan if the specified file is found (even if it's empty).
54 // Then, close rdrChan.
55 func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
56         defer close(rdrChan)
57
58         // q is a queue of FileSegments that we have received but
59         // haven't yet been able to send to toGet.
60         var q []*manifest.FileSegment
61         var r *cfReader
62         for seg := range m.FileSegmentIterByName(filename) {
63                 if r == nil {
64                         // We've just discovered that the requested
65                         // filename does appear in the manifest, so we
66                         // can return a real reader (not nil) from
67                         // CollectionFileReader().
68                         r = newCFReader(kc)
69                         rdrChan <- r
70                 }
71                 q = append(q, seg)
72                 r.totalSize += uint64(seg.Len)
73                 // Send toGet as many segments as we can until it
74                 // blocks.
75         Q:
76                 for len(q) > 0 {
77                         select {
78                         case r.toGet <- q[0]:
79                                 q = q[1:]
80                         default:
81                                 break Q
82                         }
83                 }
84         }
85         if r == nil {
86                 // File not found.
87                 return
88         }
89         close(r.countDone)
90         for _, seg := range q {
91                 r.toGet <- seg
92         }
93         close(r.toGet)
94 }
95
96 type cfReader struct {
97         keepClient *KeepClient
98
99         // doGet() reads FileSegments from toGet, gets the data from
100         // Keep, and sends byte slices to toRead to be consumed by
101         // Read().
102         toGet chan *manifest.FileSegment
103
104         // toRead is a buffered channel, sized to fit one full Keep
105         // block. This lets us verify checksums without having a
106         // store-and-forward delay between blocks: by the time the
107         // caller starts receiving data from block N, cfReader is
108         // starting to fetch block N+1. A larger buffer would be
109         // useful for a caller whose read speed varies a lot.
110         toRead chan []byte
111
112         // bytes ready to send next time someone calls Read()
113         buf []byte
114
115         // Total size of the file being read. Not safe to read this
116         // until countDone is closed.
117         totalSize uint64
118         countDone chan struct{}
119
120         // First error encountered.
121         err error
122
123         // errNotNil is closed IFF err contains a non-nil error.
124         // Receiving from it will block until an error occurs.
125         errNotNil chan struct{}
126
127         // rdrClosed is closed IFF the reader's Close() method has
128         // been called. Any goroutines associated with the reader will
129         // stop and free up resources when they notice this channel is
130         // closed.
131         rdrClosed chan struct{}
132 }
133
134 func (r *cfReader) Read(outbuf []byte) (int, error) {
135         if r.Error() != nil {
136                 // Short circuit: the caller might as well find out
137                 // now that we hit an error, even if there's buffered
138                 // data we could return.
139                 return 0, r.Error()
140         }
141         for len(r.buf) == 0 {
142                 // Private buffer was emptied out by the last Read()
143                 // (or this is the first Read() and r.buf is nil).
144                 // Read from r.toRead until we get a non-empty slice
145                 // or hit an error.
146                 var ok bool
147                 r.buf, ok = <-r.toRead
148                 if r.Error() != nil {
149                         // Error encountered while waiting for bytes
150                         return 0, r.Error()
151                 } else if !ok {
152                         // No more bytes to read, no error encountered
153                         return 0, io.EOF
154                 }
155         }
156         // Copy as much as possible from our private buffer to the
157         // caller's buffer
158         n := len(r.buf)
159         if len(r.buf) > len(outbuf) {
160                 n = len(outbuf)
161         }
162         copy(outbuf[:n], r.buf[:n])
163
164         // Next call to Read() will continue where we left off
165         r.buf = r.buf[n:]
166
167         return n, nil
168 }
169
170 // Close releases resources. It returns a non-nil error if an error
171 // was encountered by the reader.
172 func (r *cfReader) Close() error {
173         close(r.rdrClosed)
174         return r.Error()
175 }
176
177 // Error returns an error if one has been encountered, otherwise
178 // nil. It is safe to call from any goroutine.
179 func (r *cfReader) Error() error {
180         select {
181         case <-r.errNotNil:
182                 return r.err
183         default:
184                 return nil
185         }
186 }
187
188 // Len returns the total number of bytes in the file being read. If
189 // necessary, it waits for manifest parsing to finish.
190 func (r *cfReader) Len() uint64 {
191         // Wait for all segments to be counted
192         <-r.countDone
193         return r.totalSize
194 }
195
196 func (r *cfReader) doGet() {
197         defer close(r.toRead)
198 GET:
199         for fs := range r.toGet {
200                 rdr, _, _, err := r.keepClient.Get(fs.Locator)
201                 if err != nil {
202                         r.err = err
203                         close(r.errNotNil)
204                         return
205                 }
206                 var buf = make([]byte, fs.Offset+fs.Len)
207                 _, err = io.ReadFull(rdr, buf)
208                 if err != nil {
209                         r.err = err
210                         close(r.errNotNil)
211                         return
212                 }
213                 for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
214                         if bOff+bLen > fs.Offset+fs.Len {
215                                 bLen = fs.Offset + fs.Len - bOff
216                         }
217                         select {
218                         case r.toRead <- buf[bOff : bOff+bLen]:
219                         case <-r.rdrClosed:
220                                 // Reader is closed: no point sending
221                                 // anything more to toRead.
222                                 break GET
223                         }
224                 }
225                 // It is possible that r.rdrClosed is closed but we
226                 // never noticed because r.toRead was also ready in
227                 // every select{} above. Here we check before wasting
228                 // a keepclient.Get() call.
229                 select {
230                 case <-r.rdrClosed:
231                         break GET
232                 default:
233                 }
234         }
235         // In case we exited the above loop early: before returning,
236         // drain the toGet channel so its sender doesn't sit around
237         // blocking forever.
238         for _ = range r.toGet {
239         }
240 }
241
242 func newCFReader(kc *KeepClient) (r *cfReader) {
243         r = new(cfReader)
244         r.keepClient = kc
245         r.rdrClosed = make(chan struct{})
246         r.errNotNil = make(chan struct{})
247         r.toGet = make(chan *manifest.FileSegment, 2)
248         r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
249         r.countDone = make(chan struct{})
250         go r.doGet()
251         return
252 }