8 "git.curoverse.com/arvados.git/sdk/go/manifest"
11 // ReadCloserWithLen extends io.ReadCloser with a Len() method that
12 // returns the total number of bytes available to read.
13 type ReadCloserWithLen interface {
19 // After reading a data block from Keep, cfReader slices it up
20 // and sends the slices to a buffered channel to be consumed
21 // by the caller via Read().
23 // dataSliceSize is the maximum size of the slices, and
24 // therefore the maximum number of bytes that will be returned
25 // by a single call to Read().
26 dataSliceSize = 1 << 20
29 // ErrNoManifest indicates the given collection has no manifest
30 // information (e.g., manifest_text was excluded by a "select"
31 // parameter when retrieving the collection record).
32 var ErrNoManifest = errors.New("Collection has no manifest")
34 // CollectionFileReader returns a ReadCloserWithLen that reads file
35 // content from a collection. The filename must be given relative to
36 // the root of the collection, without a leading "./".
37 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
38 mText, ok := collection["manifest_text"].(string)
40 return nil, ErrNoManifest
42 m := manifest.Manifest{Text: mText}
43 rdrChan := make(chan *cfReader)
44 go kc.queueSegmentsToGet(m, filename, rdrChan)
47 return nil, os.ErrNotExist
52 // Send segments for the specified file to r.toGet. Send a *cfReader
53 // to rdrChan if the specified file is found (even if it's empty).
54 // Then, close rdrChan.
55 func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
58 // q is a queue of FileSegments that we have received but
59 // haven't yet been able to send to toGet.
60 var q []*manifest.FileSegment
62 for seg := range m.FileSegmentIterByName(filename) {
64 // We've just discovered that the requested
65 // filename does appear in the manifest, so we
66 // can return a real reader (not nil) from
67 // CollectionFileReader().
72 r.totalSize += uint64(seg.Len)
73 // Send toGet as many segments as we can until it
90 for _, seg := range q {
96 type cfReader struct {
97 keepClient *KeepClient
99 // doGet() reads FileSegments from toGet, gets the data from
100 // Keep, and sends byte slices to toRead to be consumed by
102 toGet chan *manifest.FileSegment
104 // toRead is a buffered channel, sized to fit one full Keep
105 // block. This lets us verify checksums without having a
106 // store-and-forward delay between blocks: by the time the
107 // caller starts receiving data from block N, cfReader is
108 // starting to fetch block N+1. A larger buffer would be
109 // useful for a caller whose read speed varies a lot.
112 // bytes ready to send next time someone calls Read()
115 // Total size of the file being read. Not safe to read this
116 // until countDone is closed.
118 countDone chan struct{}
120 // First error encountered.
123 // errNotNil is closed IFF err contains a non-nil error.
124 // Receiving from it will block until an error occurs.
125 errNotNil chan struct{}
127 // rdrClosed is closed IFF the reader's Close() method has
128 // been called. Any goroutines associated with the reader will
129 // stop and free up resources when they notice this channel is
131 rdrClosed chan struct{}
134 func (r *cfReader) Read(outbuf []byte) (int, error) {
135 if r.Error() != nil {
136 // Short circuit: the caller might as well find out
137 // now that we hit an error, even if there's buffered
138 // data we could return.
141 for len(r.buf) == 0 {
142 // Private buffer was emptied out by the last Read()
143 // (or this is the first Read() and r.buf is nil).
144 // Read from r.toRead until we get a non-empty slice
147 r.buf, ok = <-r.toRead
148 if r.Error() != nil {
149 // Error encountered while waiting for bytes
152 // No more bytes to read, no error encountered
156 // Copy as much as possible from our private buffer to the
159 if len(r.buf) > len(outbuf) {
162 copy(outbuf[:n], r.buf[:n])
164 // Next call to Read() will continue where we left off
170 // Close releases resources. It returns a non-nil error if an error
171 // was encountered by the reader.
172 func (r *cfReader) Close() error {
177 // Error returns an error if one has been encountered, otherwise
178 // nil. It is safe to call from any goroutine.
179 func (r *cfReader) Error() error {
188 // Len returns the total number of bytes in the file being read. If
189 // necessary, it waits for manifest parsing to finish.
190 func (r *cfReader) Len() uint64 {
191 // Wait for all segments to be counted
196 func (r *cfReader) doGet() {
197 defer close(r.toRead)
199 for fs := range r.toGet {
200 rdr, _, _, err := r.keepClient.Get(fs.Locator)
206 var buf = make([]byte, fs.Offset+fs.Len)
207 _, err = io.ReadFull(rdr, buf)
213 for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
214 if bOff+bLen > fs.Offset+fs.Len {
215 bLen = fs.Offset + fs.Len - bOff
218 case r.toRead <- buf[bOff : bOff+bLen]:
220 // Reader is closed: no point sending
221 // anything more to toRead.
225 // It is possible that r.rdrClosed is closed but we
226 // never noticed because r.toRead was also ready in
227 // every select{} above. Here we check before wasting
228 // a keepclient.Get() call.
235 // In case we exited the above loop early: before returning,
236 // drain the toGet channel so its sender doesn't sit around
238 for _ = range r.toGet {
242 func newCFReader(kc *KeepClient) (r *cfReader) {
245 r.rdrClosed = make(chan struct{})
246 r.errNotNil = make(chan struct{})
247 r.toGet = make(chan *manifest.FileSegment, 2)
248 r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
249 r.countDone = make(chan struct{})