X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4f5a6df52559b90d2c9412624f3c4c7fbe467579..9b90fe97f8e92794856e3d730811953e1c13ea25:/sdk/go/keepclient/collectionreader.go?ds=sidebyside diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 0d05b8a00e..bed60f4995 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -8,6 +8,13 @@ import ( "git.curoverse.com/arvados.git/sdk/go/manifest" ) +// ReadCloserWithLen extends io.ReadCloser with a Len() method that +// returns the total number of bytes available to read. +type ReadCloserWithLen interface { + io.ReadCloser + Len() uint64 +} + const ( // After reading a data block from Keep, cfReader slices it up // and sends the slices to a buffered channel to be consumed @@ -24,71 +31,80 @@ const ( // parameter when retrieving the collection record). var ErrNoManifest = errors.New("Collection has no manifest") -// CollectionFileReader returns an io.Reader that reads file content -// from a collection. The filename must be given relative to the root -// of the collection, without a leading "./". -func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) { +// CollectionFileReader returns a ReadCloserWithLen that reads file +// content from a collection. The filename must be given relative to +// the root of the collection, without a leading "./". +func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) { mText, ok := collection["manifest_text"].(string) if !ok { return nil, ErrNoManifest } m := manifest.Manifest{Text: mText} + return kc.ManifestFileReader(m, filename) +} + +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) { rdrChan := make(chan *cfReader) - go func() { - // q is a queue of FileSegments that we have received but - // haven't yet been able to send to toGet. - var q []*manifest.FileSegment - var r *cfReader - for seg := range m.FileSegmentIterByName(filename) { - if r == nil { - // We've just discovered that the - // requested filename does appear in - // the manifest, so we can return a - // real reader (not nil) from - // CollectionFileReader(). - r = newCFReader(kc) - rdrChan <- r - } - q = append(q, seg) - r.totalSize += uint64(seg.Len) - // Send toGet as many segments as we can until - // it blocks. - Q: - for len(q) > 0 { - select { - case r.toGet <- q[0]: - q = q[1:] - default: - break Q - } - } - } + go kc.queueSegmentsToGet(m, filename, rdrChan) + r, ok := <-rdrChan + if !ok { + return nil, os.ErrNotExist + } + return r, nil +} + +// Send segments for the specified file to r.toGet. Send a *cfReader +// to rdrChan if the specified file is found (even if it's empty). +// Then, close rdrChan. +func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) { + defer close(rdrChan) + + // q is a queue of FileSegments that we have received but + // haven't yet been able to send to toGet. + var q []*manifest.FileSegment + var r *cfReader + for seg := range m.FileSegmentIterByName(filename) { if r == nil { - // File not found - rdrChan <- nil - return + // We've just discovered that the requested + // filename does appear in the manifest, so we + // can return a real reader (not nil) from + // CollectionFileReader(). + r = newCFReader(kc) + rdrChan <- r } - close(r.countDone) - for _, seg := range q { - r.toGet <- seg + q = append(q, seg) + r.totalSize += uint64(seg.Len) + // Send toGet as many segments as we can until it + // blocks. + Q: + for len(q) > 0 { + select { + case r.toGet <- q[0]: + q = q[1:] + default: + break Q + } } - close(r.toGet) - }() - // Before returning a reader, wait until we know whether the - // file exists here: - r := <-rdrChan + } if r == nil { - return nil, os.ErrNotExist + // File not found. + return } - return r, nil + close(r.countDone) + for _, seg := range q { + r.toGet <- seg + } + close(r.toGet) } type cfReader struct { keepClient *KeepClient + // doGet() reads FileSegments from toGet, gets the data from // Keep, and sends byte slices to toRead to be consumed by // Read(). toGet chan *manifest.FileSegment + // toRead is a buffered channel, sized to fit one full Keep // block. This lets us verify checksums without having a // store-and-forward delay between blocks: by the time the @@ -96,17 +112,22 @@ type cfReader struct { // starting to fetch block N+1. A larger buffer would be // useful for a caller whose read speed varies a lot. toRead chan []byte + // bytes ready to send next time someone calls Read() buf []byte + // Total size of the file being read. Not safe to read this // until countDone is closed. totalSize uint64 countDone chan struct{} + // First error encountered. err error + // errNotNil is closed IFF err contains a non-nil error. // Receiving from it will block until an error occurs. errNotNil chan struct{} + // rdrClosed is closed IFF the reader's Close() method has // been called. Any goroutines associated with the reader will // stop and free up resources when they notice this channel is @@ -116,31 +137,49 @@ type cfReader struct { func (r *cfReader) Read(outbuf []byte) (int, error) { if r.Error() != nil { + // Short circuit: the caller might as well find out + // now that we hit an error, even if there's buffered + // data we could return. return 0, r.Error() } - for r.buf == nil || len(r.buf) == 0 { + for len(r.buf) == 0 { + // Private buffer was emptied out by the last Read() + // (or this is the first Read() and r.buf is nil). + // Read from r.toRead until we get a non-empty slice + // or hit an error. var ok bool r.buf, ok = <-r.toRead if r.Error() != nil { + // Error encountered while waiting for bytes return 0, r.Error() } else if !ok { + // No more bytes to read, no error encountered return 0, io.EOF } } + // Copy as much as possible from our private buffer to the + // caller's buffer n := len(r.buf) if len(r.buf) > len(outbuf) { n = len(outbuf) } copy(outbuf[:n], r.buf[:n]) + + // Next call to Read() will continue where we left off r.buf = r.buf[n:] + return n, nil } +// Close releases resources. It returns a non-nil error if an error +// was encountered by the reader. func (r *cfReader) Close() error { close(r.rdrClosed) return r.Error() } +// Error returns an error if one has been encountered, otherwise +// nil. It is safe to call from any goroutine. func (r *cfReader) Error() error { select { case <-r.errNotNil: @@ -150,6 +189,8 @@ func (r *cfReader) Error() error { } } +// Len returns the total number of bytes in the file being read. If +// necessary, it waits for manifest parsing to finish. func (r *cfReader) Len() uint64 { // Wait for all segments to be counted <-r.countDone @@ -168,12 +209,16 @@ GET: } var buf = make([]byte, fs.Offset+fs.Len) _, err = io.ReadFull(rdr, buf) + errClosing := rdr.Close() + if err == nil { + err = errClosing + } if err != nil { r.err = err close(r.errNotNil) return } - for bOff, bLen := fs.Offset, dataSliceSize; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen { + for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen { if bOff+bLen > fs.Offset+fs.Len { bLen = fs.Offset + fs.Len - bOff }