- m := manifest.Manifest{Text: mText}
- return kc.ManifestFileReader(m, filename)
-}
-
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
- rdrChan := make(chan *cfReader)
- go kc.queueSegmentsToGet(m, filename, rdrChan)
- r, ok := <-rdrChan
- if !ok {
- return nil, os.ErrNotExist
- }
- return r, nil
-}
-
-// Send segments for the specified file to r.toGet. Send a *cfReader
-// to rdrChan if the specified file is found (even if it's empty).
-// Then, close rdrChan.
-func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
- defer close(rdrChan)
-
- // q is a queue of FileSegments that we have received but
- // haven't yet been able to send to toGet.
- var q []*manifest.FileSegment
- var r *cfReader
- for seg := range m.FileSegmentIterByName(filename) {
- if r == nil {
- // We've just discovered that the requested
- // filename does appear in the manifest, so we
- // can return a real reader (not nil) from
- // CollectionFileReader().
- r = newCFReader(kc)
- rdrChan <- r
- }
- q = append(q, seg)
- r.totalSize += uint64(seg.Len)
- // Send toGet as many segments as we can until it
- // blocks.
- Q:
- for len(q) > 0 {
- select {
- case r.toGet <- q[0]:
- q = q[1:]
- default:
- break Q
- }
- }
- }
- if r == nil {
- // File not found.
- return
- }
- close(r.countDone)
- for _, seg := range q {
- r.toGet <- seg
- }
- close(r.toGet)
-}
-
-type cfReader struct {
- keepClient *KeepClient
-
- // doGet() reads FileSegments from toGet, gets the data from
- // Keep, and sends byte slices to toRead to be consumed by
- // Read().
- toGet chan *manifest.FileSegment
-
- // toRead is a buffered channel, sized to fit one full Keep
- // block. This lets us verify checksums without having a
- // store-and-forward delay between blocks: by the time the
- // caller starts receiving data from block N, cfReader is
- // starting to fetch block N+1. A larger buffer would be
- // useful for a caller whose read speed varies a lot.
- toRead chan []byte
-
- // bytes ready to send next time someone calls Read()
- buf []byte
-
- // Total size of the file being read. Not safe to read this
- // until countDone is closed.
- totalSize uint64
- countDone chan struct{}
-
- // First error encountered.
- err error
-
- // errNotNil is closed IFF err contains a non-nil error.
- // Receiving from it will block until an error occurs.
- errNotNil chan struct{}
-
- // rdrClosed is closed IFF the reader's Close() method has
- // been called. Any goroutines associated with the reader will
- // stop and free up resources when they notice this channel is
- // closed.
- rdrClosed chan struct{}
-}
-
-func (r *cfReader) Read(outbuf []byte) (int, error) {
- if r.Error() != nil {
- // Short circuit: the caller might as well find out
- // now that we hit an error, even if there's buffered
- // data we could return.
- return 0, r.Error()
- }
- for len(r.buf) == 0 {
- // Private buffer was emptied out by the last Read()
- // (or this is the first Read() and r.buf is nil).
- // Read from r.toRead until we get a non-empty slice
- // or hit an error.
- var ok bool
- r.buf, ok = <-r.toRead
- if r.Error() != nil {
- // Error encountered while waiting for bytes
- return 0, r.Error()
- } else if !ok {
- // No more bytes to read, no error encountered
- return 0, io.EOF
- }
- }
- // Copy as much as possible from our private buffer to the
- // caller's buffer
- n := len(r.buf)
- if len(r.buf) > len(outbuf) {
- n = len(outbuf)
- }
- copy(outbuf[:n], r.buf[:n])
-
- // Next call to Read() will continue where we left off
- r.buf = r.buf[n:]
-
- return n, nil
-}
-
-// Close releases resources. It returns a non-nil error if an error
-// was encountered by the reader.
-func (r *cfReader) Close() error {
- close(r.rdrClosed)
- return r.Error()
-}
-
-// Error returns an error if one has been encountered, otherwise
-// nil. It is safe to call from any goroutine.
-func (r *cfReader) Error() error {
- select {
- case <-r.errNotNil:
- return r.err
- default:
- return nil