Merge branch '8953-no-double-count' refs #8953
[arvados.git] / sdk / go / keepclient / collectionreader.go
1 package keepclient
2
3 import (
4         "errors"
5         "io"
6         "os"
7
8         "git.curoverse.com/arvados.git/sdk/go/manifest"
9 )
10
11 // ReadCloserWithLen extends io.ReadCloser with a Len() method that
12 // returns the total number of bytes available to read.
13 type ReadCloserWithLen interface {
14         io.ReadCloser
15         Len() uint64
16 }
17
18 const (
19         // After reading a data block from Keep, cfReader slices it up
20         // and sends the slices to a buffered channel to be consumed
21         // by the caller via Read().
22         //
23         // dataSliceSize is the maximum size of the slices, and
24         // therefore the maximum number of bytes that will be returned
25         // by a single call to Read().
26         dataSliceSize = 1 << 20
27 )
28
29 // ErrNoManifest indicates the given collection has no manifest
30 // information (e.g., manifest_text was excluded by a "select"
31 // parameter when retrieving the collection record).
32 var ErrNoManifest = errors.New("Collection has no manifest")
33
34 // CollectionFileReader returns a ReadCloserWithLen that reads file
35 // content from a collection. The filename must be given relative to
36 // the root of the collection, without a leading "./".
37 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
38         mText, ok := collection["manifest_text"].(string)
39         if !ok {
40                 return nil, ErrNoManifest
41         }
42         m := manifest.Manifest{Text: mText}
43         return kc.ManifestFileReader(m, filename)
44 }
45
46 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
47         rdrChan := make(chan *cfReader)
48         go kc.queueSegmentsToGet(m, filename, rdrChan)
49         r, ok := <-rdrChan
50         if !ok {
51                 return nil, os.ErrNotExist
52         }
53         return r, nil
54 }
55
56 // Send segments for the specified file to r.toGet. Send a *cfReader
57 // to rdrChan if the specified file is found (even if it's empty).
58 // Then, close rdrChan.
59 func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
60         defer close(rdrChan)
61
62         // q is a queue of FileSegments that we have received but
63         // haven't yet been able to send to toGet.
64         var q []*manifest.FileSegment
65         var r *cfReader
66         for seg := range m.FileSegmentIterByName(filename) {
67                 if r == nil {
68                         // We've just discovered that the requested
69                         // filename does appear in the manifest, so we
70                         // can return a real reader (not nil) from
71                         // CollectionFileReader().
72                         r = newCFReader(kc)
73                         rdrChan <- r
74                 }
75                 q = append(q, seg)
76                 r.totalSize += uint64(seg.Len)
77                 // Send toGet as many segments as we can until it
78                 // blocks.
79         Q:
80                 for len(q) > 0 {
81                         select {
82                         case r.toGet <- q[0]:
83                                 q = q[1:]
84                         default:
85                                 break Q
86                         }
87                 }
88         }
89         if r == nil {
90                 // File not found.
91                 return
92         }
93         close(r.countDone)
94         for _, seg := range q {
95                 r.toGet <- seg
96         }
97         close(r.toGet)
98 }
99
100 type cfReader struct {
101         keepClient *KeepClient
102
103         // doGet() reads FileSegments from toGet, gets the data from
104         // Keep, and sends byte slices to toRead to be consumed by
105         // Read().
106         toGet chan *manifest.FileSegment
107
108         // toRead is a buffered channel, sized to fit one full Keep
109         // block. This lets us verify checksums without having a
110         // store-and-forward delay between blocks: by the time the
111         // caller starts receiving data from block N, cfReader is
112         // starting to fetch block N+1. A larger buffer would be
113         // useful for a caller whose read speed varies a lot.
114         toRead chan []byte
115
116         // bytes ready to send next time someone calls Read()
117         buf []byte
118
119         // Total size of the file being read. Not safe to read this
120         // until countDone is closed.
121         totalSize uint64
122         countDone chan struct{}
123
124         // First error encountered.
125         err error
126
127         // errNotNil is closed IFF err contains a non-nil error.
128         // Receiving from it will block until an error occurs.
129         errNotNil chan struct{}
130
131         // rdrClosed is closed IFF the reader's Close() method has
132         // been called. Any goroutines associated with the reader will
133         // stop and free up resources when they notice this channel is
134         // closed.
135         rdrClosed chan struct{}
136 }
137
138 func (r *cfReader) Read(outbuf []byte) (int, error) {
139         if r.Error() != nil {
140                 // Short circuit: the caller might as well find out
141                 // now that we hit an error, even if there's buffered
142                 // data we could return.
143                 return 0, r.Error()
144         }
145         for len(r.buf) == 0 {
146                 // Private buffer was emptied out by the last Read()
147                 // (or this is the first Read() and r.buf is nil).
148                 // Read from r.toRead until we get a non-empty slice
149                 // or hit an error.
150                 var ok bool
151                 r.buf, ok = <-r.toRead
152                 if r.Error() != nil {
153                         // Error encountered while waiting for bytes
154                         return 0, r.Error()
155                 } else if !ok {
156                         // No more bytes to read, no error encountered
157                         return 0, io.EOF
158                 }
159         }
160         // Copy as much as possible from our private buffer to the
161         // caller's buffer
162         n := len(r.buf)
163         if len(r.buf) > len(outbuf) {
164                 n = len(outbuf)
165         }
166         copy(outbuf[:n], r.buf[:n])
167
168         // Next call to Read() will continue where we left off
169         r.buf = r.buf[n:]
170
171         return n, nil
172 }
173
174 // Close releases resources. It returns a non-nil error if an error
175 // was encountered by the reader.
176 func (r *cfReader) Close() error {
177         close(r.rdrClosed)
178         return r.Error()
179 }
180
181 // Error returns an error if one has been encountered, otherwise
182 // nil. It is safe to call from any goroutine.
183 func (r *cfReader) Error() error {
184         select {
185         case <-r.errNotNil:
186                 return r.err
187         default:
188                 return nil
189         }
190 }
191
192 // Len returns the total number of bytes in the file being read. If
193 // necessary, it waits for manifest parsing to finish.
194 func (r *cfReader) Len() uint64 {
195         // Wait for all segments to be counted
196         <-r.countDone
197         return r.totalSize
198 }
199
200 func (r *cfReader) doGet() {
201         defer close(r.toRead)
202 GET:
203         for fs := range r.toGet {
204                 rdr, _, _, err := r.keepClient.Get(fs.Locator)
205                 if err != nil {
206                         r.err = err
207                         close(r.errNotNil)
208                         return
209                 }
210                 var buf = make([]byte, fs.Offset+fs.Len)
211                 _, err = io.ReadFull(rdr, buf)
212                 errClosing := rdr.Close()
213                 if err == nil {
214                         err = errClosing
215                 }
216                 if err != nil {
217                         r.err = err
218                         close(r.errNotNil)
219                         return
220                 }
221                 for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
222                         if bOff+bLen > fs.Offset+fs.Len {
223                                 bLen = fs.Offset + fs.Len - bOff
224                         }
225                         select {
226                         case r.toRead <- buf[bOff : bOff+bLen]:
227                         case <-r.rdrClosed:
228                                 // Reader is closed: no point sending
229                                 // anything more to toRead.
230                                 break GET
231                         }
232                 }
233                 // It is possible that r.rdrClosed is closed but we
234                 // never noticed because r.toRead was also ready in
235                 // every select{} above. Here we check before wasting
236                 // a keepclient.Get() call.
237                 select {
238                 case <-r.rdrClosed:
239                         break GET
240                 default:
241                 }
242         }
243         // In case we exited the above loop early: before returning,
244         // drain the toGet channel so its sender doesn't sit around
245         // blocking forever.
246         for _ = range r.toGet {
247         }
248 }
249
250 func newCFReader(kc *KeepClient) (r *cfReader) {
251         r = new(cfReader)
252         r.keepClient = kc
253         r.rdrClosed = make(chan struct{})
254         r.errNotNil = make(chan struct{})
255         r.toGet = make(chan *manifest.FileSegment, 2)
256         r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
257         r.countDone = make(chan struct{})
258         go r.doGet()
259         return
260 }