"git.curoverse.com/arvados.git/sdk/go/manifest"
)
+// ReadCloserWithLen extends io.ReadCloser with a Len() method that
+// returns the total number of bytes available to read.
+type ReadCloserWithLen interface {
+ io.ReadCloser
+ Len() uint64
+}
+
+const (
+ // After reading a data block from Keep, cfReader slices it up
+ // and sends the slices to a buffered channel to be consumed
+ // by the caller via Read().
+ //
+ // dataSliceSize is the maximum size of the slices, and
+ // therefore the maximum number of bytes that will be returned
+ // by a single call to Read().
+ dataSliceSize = 1 << 20
+)
+
// ErrNoManifest indicates the given collection has no manifest
// information (e.g., manifest_text was excluded by a "select"
// parameter when retrieving the collection record).
var ErrNoManifest = errors.New("Collection has no manifest")
-// CollectionFileReader returns an io.Reader that reads file content
-// from a collection. The filename must be given relative to the root
-// of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) {
+// CollectionFileReader returns a ReadCloserWithLen that reads file
+// content from a collection. The filename must be given relative to
+// the root of the collection, without a leading "./".
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
mText, ok := collection["manifest_text"].(string)
if !ok {
return nil, ErrNoManifest
}
m := manifest.Manifest{Text: mText}
rdrChan := make(chan *cfReader)
- go func() {
- // q is a queue of FileSegments that we have received but
- // haven't yet been able to send to toGet.
- var q []*manifest.FileSegment
- var r *cfReader
- for seg := range m.FileSegmentIterByName(filename) {
- if r == nil {
- // We've just discovered that the
- // requested filename does appear in
- // the manifest, so we can return a
- // real reader (not nil) from
- // CollectionFileReader().
- r = newCFReader(kc)
- rdrChan <- r
- }
- q = append(q, seg)
- r.totalSize += uint64(seg.Len)
- // Send toGet whatever it's ready to receive.
- Q: for len(q) > 0 {
- select {
- case r.toGet <- q[0]:
- q = q[1:]
- default:
- break Q
- }
- }
- }
+ go kc.queueSegmentsToGet(m, filename, rdrChan)
+ r, ok := <-rdrChan
+ if !ok {
+ return nil, os.ErrNotExist
+ }
+ return r, nil
+}
+
+// Send segments for the specified file to r.toGet. Send a *cfReader
+// to rdrChan if the specified file is found (even if it's empty).
+// Then, close rdrChan.
+func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
+ defer close(rdrChan)
+
+ // q is a queue of FileSegments that we have received but
+ // haven't yet been able to send to toGet.
+ var q []*manifest.FileSegment
+ var r *cfReader
+ for seg := range m.FileSegmentIterByName(filename) {
if r == nil {
- // File not found
- rdrChan <- nil
- return
+ // We've just discovered that the requested
+ // filename does appear in the manifest, so we
+ // can return a real reader (not nil) from
+ // CollectionFileReader().
+ r = newCFReader(kc)
+ rdrChan <- r
}
- close(r.countDone)
- for _, seg := range q {
- r.toGet <- seg
+ q = append(q, seg)
+ r.totalSize += uint64(seg.Len)
+ // Send toGet as many segments as we can until it
+ // blocks.
+ Q:
+ for len(q) > 0 {
+ select {
+ case r.toGet <- q[0]:
+ q = q[1:]
+ default:
+ break Q
+ }
}
- close(r.toGet)
- }()
- // Before returning a reader, wait until we know whether the
- // file exists here:
- r := <-rdrChan
+ }
if r == nil {
- return nil, os.ErrNotExist
+ // File not found.
+ return
}
- return r, nil
+ close(r.countDone)
+ for _, seg := range q {
+ r.toGet <- seg
+ }
+ close(r.toGet)
}
type cfReader struct {
keepClient *KeepClient
+
// doGet() reads FileSegments from toGet, gets the data from
// Keep, and sends byte slices to toRead to be consumed by
// Read().
- toGet chan *manifest.FileSegment
- toRead chan []byte
+ toGet chan *manifest.FileSegment
+
+ // toRead is a buffered channel, sized to fit one full Keep
+ // block. This lets us verify checksums without having a
+ // store-and-forward delay between blocks: by the time the
+ // caller starts receiving data from block N, cfReader is
+ // starting to fetch block N+1. A larger buffer would be
+ // useful for a caller whose read speed varies a lot.
+ toRead chan []byte
+
// bytes ready to send next time someone calls Read()
- buf []byte
+ buf []byte
+
// Total size of the file being read. Not safe to read this
// until countDone is closed.
- totalSize uint64
- countDone chan struct{}
+ totalSize uint64
+ countDone chan struct{}
+
// First error encountered.
- err error
+ err error
+
+ // errNotNil is closed IFF err contains a non-nil error.
+ // Receiving from it will block until an error occurs.
+ errNotNil chan struct{}
+
+ // rdrClosed is closed IFF the reader's Close() method has
+ // been called. Any goroutines associated with the reader will
+ // stop and free up resources when they notice this channel is
+ // closed.
+ rdrClosed chan struct{}
}
-func (r *cfReader) Read(outbuf []byte) (n int, err error) {
- if r.err != nil {
- return 0, r.err
+func (r *cfReader) Read(outbuf []byte) (int, error) {
+ if r.Error() != nil {
+ // Short circuit: the caller might as well find out
+ // now that we hit an error, even if there's buffered
+ // data we could return.
+ return 0, r.Error()
}
- for r.buf == nil || len(r.buf) == 0 {
+ for len(r.buf) == 0 {
+ // Private buffer was emptied out by the last Read()
+ // (or this is the first Read() and r.buf is nil).
+ // Read from r.toRead until we get a non-empty slice
+ // or hit an error.
var ok bool
r.buf, ok = <-r.toRead
- if r.err != nil {
- return 0, r.err
+ if r.Error() != nil {
+ // Error encountered while waiting for bytes
+ return 0, r.Error()
} else if !ok {
+ // No more bytes to read, no error encountered
return 0, io.EOF
}
}
+ // Copy as much as possible from our private buffer to the
+ // caller's buffer
+ n := len(r.buf)
if len(r.buf) > len(outbuf) {
n = len(outbuf)
- } else {
- n = len(r.buf)
}
copy(outbuf[:n], r.buf[:n])
+
+ // Next call to Read() will continue where we left off
r.buf = r.buf[n:]
- return
+
+ return n, nil
}
+// Close releases resources. It returns a non-nil error if an error
+// was encountered by the reader.
func (r *cfReader) Close() error {
- _, _ = <-r.countDone
- for _ = range r.toGet {
- }
- for _ = range r.toRead {
+ close(r.rdrClosed)
+ return r.Error()
+}
+
+// Error returns an error if one has been encountered, otherwise
+// nil. It is safe to call from any goroutine.
+func (r *cfReader) Error() error {
+ select {
+ case <-r.errNotNil:
+ return r.err
+ default:
+ return nil
}
- return r.err
}
+// Len returns the total number of bytes in the file being read. If
+// necessary, it waits for manifest parsing to finish.
func (r *cfReader) Len() uint64 {
// Wait for all segments to be counted
- _, _ = <-r.countDone
+ <-r.countDone
return r.totalSize
}
func (r *cfReader) doGet() {
defer close(r.toRead)
+GET:
for fs := range r.toGet {
rdr, _, _, err := r.keepClient.Get(fs.Locator)
if err != nil {
r.err = err
+ close(r.errNotNil)
return
}
var buf = make([]byte, fs.Offset+fs.Len)
_, err = io.ReadFull(rdr, buf)
if err != nil {
r.err = err
+ close(r.errNotNil)
return
}
- for bOff, bLen := fs.Offset, 1<<20; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen {
+ for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
if bOff+bLen > fs.Offset+fs.Len {
bLen = fs.Offset + fs.Len - bOff
}
- r.toRead <- buf[bOff : bOff+bLen]
+ select {
+ case r.toRead <- buf[bOff : bOff+bLen]:
+ case <-r.rdrClosed:
+ // Reader is closed: no point sending
+ // anything more to toRead.
+ break GET
+ }
+ }
+ // It is possible that r.rdrClosed is closed but we
+ // never noticed because r.toRead was also ready in
+ // every select{} above. Here we check before wasting
+ // a keepclient.Get() call.
+ select {
+ case <-r.rdrClosed:
+ break GET
+ default:
}
}
+ // In case we exited the above loop early: before returning,
+ // drain the toGet channel so its sender doesn't sit around
+ // blocking forever.
+ for _ = range r.toGet {
+ }
}
func newCFReader(kc *KeepClient) (r *cfReader) {
r = new(cfReader)
r.keepClient = kc
+ r.rdrClosed = make(chan struct{})
+ r.errNotNil = make(chan struct{})
r.toGet = make(chan *manifest.FileSegment, 2)
- r.toRead = make(chan []byte)
+ r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
r.countDone = make(chan struct{})
go r.doGet()
return