8 "git.curoverse.com/arvados.git/sdk/go/manifest"
11 // ReadCloserWithLen extends io.ReadCloser with a Len() method that
12 // returns the total number of bytes available to read.
13 type ReadCloserWithLen interface {
19 // After reading a data block from Keep, cfReader slices it up
20 // and sends the slices to a buffered channel to be consumed
21 // by the caller via Read().
23 // dataSliceSize is the maximum size of the slices, and
24 // therefore the maximum number of bytes that will be returned
25 // by a single call to Read().
26 dataSliceSize = 1 << 20
29 // ErrNoManifest indicates the given collection has no manifest
30 // information (e.g., manifest_text was excluded by a "select"
31 // parameter when retrieving the collection record).
32 var ErrNoManifest = errors.New("Collection has no manifest")
34 // CollectionFileReader returns a ReadCloserWithLen that reads file
35 // content from a collection. The filename must be given relative to
36 // the root of the collection, without a leading "./".
37 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
38 mText, ok := collection["manifest_text"].(string)
40 return nil, ErrNoManifest
42 m := manifest.Manifest{Text: mText}
43 return kc.ManifestFileReader(m, filename)
46 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
47 rdrChan := make(chan *cfReader)
48 go kc.queueSegmentsToGet(m, filename, rdrChan)
51 return nil, os.ErrNotExist
56 // Send segments for the specified file to r.toGet. Send a *cfReader
57 // to rdrChan if the specified file is found (even if it's empty).
58 // Then, close rdrChan.
59 func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
62 // q is a queue of FileSegments that we have received but
63 // haven't yet been able to send to toGet.
64 var q []*manifest.FileSegment
66 for seg := range m.FileSegmentIterByName(filename) {
68 // We've just discovered that the requested
69 // filename does appear in the manifest, so we
70 // can return a real reader (not nil) from
71 // CollectionFileReader().
76 r.totalSize += uint64(seg.Len)
77 // Send toGet as many segments as we can until it
94 for _, seg := range q {
100 type cfReader struct {
101 keepClient *KeepClient
103 // doGet() reads FileSegments from toGet, gets the data from
104 // Keep, and sends byte slices to toRead to be consumed by
106 toGet chan *manifest.FileSegment
108 // toRead is a buffered channel, sized to fit one full Keep
109 // block. This lets us verify checksums without having a
110 // store-and-forward delay between blocks: by the time the
111 // caller starts receiving data from block N, cfReader is
112 // starting to fetch block N+1. A larger buffer would be
113 // useful for a caller whose read speed varies a lot.
116 // bytes ready to send next time someone calls Read()
119 // Total size of the file being read. Not safe to read this
120 // until countDone is closed.
122 countDone chan struct{}
124 // First error encountered.
127 // errNotNil is closed IFF err contains a non-nil error.
128 // Receiving from it will block until an error occurs.
129 errNotNil chan struct{}
131 // rdrClosed is closed IFF the reader's Close() method has
132 // been called. Any goroutines associated with the reader will
133 // stop and free up resources when they notice this channel is
135 rdrClosed chan struct{}
138 func (r *cfReader) Read(outbuf []byte) (int, error) {
139 if r.Error() != nil {
140 // Short circuit: the caller might as well find out
141 // now that we hit an error, even if there's buffered
142 // data we could return.
145 for len(r.buf) == 0 {
146 // Private buffer was emptied out by the last Read()
147 // (or this is the first Read() and r.buf is nil).
148 // Read from r.toRead until we get a non-empty slice
151 r.buf, ok = <-r.toRead
152 if r.Error() != nil {
153 // Error encountered while waiting for bytes
156 // No more bytes to read, no error encountered
160 // Copy as much as possible from our private buffer to the
163 if len(r.buf) > len(outbuf) {
166 copy(outbuf[:n], r.buf[:n])
168 // Next call to Read() will continue where we left off
174 // Close releases resources. It returns a non-nil error if an error
175 // was encountered by the reader.
176 func (r *cfReader) Close() error {
181 // Error returns an error if one has been encountered, otherwise
182 // nil. It is safe to call from any goroutine.
183 func (r *cfReader) Error() error {
192 // Len returns the total number of bytes in the file being read. If
193 // necessary, it waits for manifest parsing to finish.
194 func (r *cfReader) Len() uint64 {
195 // Wait for all segments to be counted
200 func (r *cfReader) doGet() {
201 defer close(r.toRead)
203 for fs := range r.toGet {
204 rdr, _, _, err := r.keepClient.Get(fs.Locator)
210 var buf = make([]byte, fs.Offset+fs.Len)
211 _, err = io.ReadFull(rdr, buf)
217 for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
218 if bOff+bLen > fs.Offset+fs.Len {
219 bLen = fs.Offset + fs.Len - bOff
222 case r.toRead <- buf[bOff : bOff+bLen]:
224 // Reader is closed: no point sending
225 // anything more to toRead.
229 // It is possible that r.rdrClosed is closed but we
230 // never noticed because r.toRead was also ready in
231 // every select{} above. Here we check before wasting
232 // a keepclient.Get() call.
239 // In case we exited the above loop early: before returning,
240 // drain the toGet channel so its sender doesn't sit around
242 for _ = range r.toGet {
246 func newCFReader(kc *KeepClient) (r *cfReader) {
249 r.rdrClosed = make(chan struct{})
250 r.errNotNil = make(chan struct{})
251 r.toGet = make(chan *manifest.FileSegment, 2)
252 r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
253 r.countDone = make(chan struct{})