X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d77bf0c67422a259afacc17660698729328a1ed3..08793025fb951153ce374f8eb4f984ee21f6a2bc:/sdk/go/keepclient/collectionreader.go diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index d2c171d961..527318eb49 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -2,19 +2,14 @@ package keepclient import ( "errors" + "fmt" "io" "os" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/manifest" ) -// ReadCloserWithLen extends io.ReadCloser with a Len() method that -// returns the total number of bytes available to read. -type ReadCloserWithLen interface { - io.ReadCloser - Len() uint64 -} - const ( // After reading a data block from Keep, cfReader slices it up // and sends the slices to a buffered channel to be consumed @@ -31,10 +26,10 @@ const ( // parameter when retrieving the collection record). var ErrNoManifest = errors.New("Collection has no manifest") -// CollectionFileReader returns a ReadCloserWithLen that reads file -// content from a collection. The filename must be given relative to -// the root of the collection, without a leading "./". -func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) { +// CollectionFileReader returns a Reader that reads content from a single file +// in the collection. The filename must be relative to the root of the +// collection. A leading prefix of "/" or "./" in the filename is ignored. +func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) { mText, ok := collection["manifest_text"].(string) if !ok { return nil, ErrNoManifest @@ -43,214 +38,137 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi return kc.ManifestFileReader(m, filename) } -func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) { - rdrChan := make(chan *cfReader) - go kc.queueSegmentsToGet(m, filename, rdrChan) - r, ok := <-rdrChan - if !ok { - return nil, os.ErrNotExist - } - return r, nil -} - -// Send segments for the specified file to r.toGet. Send a *cfReader -// to rdrChan if the specified file is found (even if it's empty). -// Then, close rdrChan. -func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) { - defer close(rdrChan) - - // q is a queue of FileSegments that we have received but - // haven't yet been able to send to toGet. - var q []*manifest.FileSegment - var r *cfReader - for seg := range m.FileSegmentIterByName(filename) { - if r == nil { - // We've just discovered that the requested - // filename does appear in the manifest, so we - // can return a real reader (not nil) from - // CollectionFileReader(). - r = newCFReader(kc) - rdrChan <- r - } - q = append(q, seg) - r.totalSize += uint64(seg.Len) - // Send toGet as many segments as we can until it - // blocks. - Q: - for len(q) > 0 { - select { - case r.toGet <- q[0]: - q = q[1:] - default: - break Q - } - } +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { + f := &file{ + kc: kc, } - if r == nil { - // File not found. - return + err := f.load(m, filename) + if err != nil { + return nil, err } - close(r.countDone) - for _, seg := range q { - r.toGet <- seg - } - close(r.toGet) + return f, nil } -type cfReader struct { - keepClient *KeepClient - - // doGet() reads FileSegments from toGet, gets the data from - // Keep, and sends byte slices to toRead to be consumed by - // Read(). - toGet chan *manifest.FileSegment - - // toRead is a buffered channel, sized to fit one full Keep - // block. This lets us verify checksums without having a - // store-and-forward delay between blocks: by the time the - // caller starts receiving data from block N, cfReader is - // starting to fetch block N+1. A larger buffer would be - // useful for a caller whose read speed varies a lot. - toRead chan []byte - - // bytes ready to send next time someone calls Read() - buf []byte - - // Total size of the file being read. Not safe to read this - // until countDone is closed. - totalSize uint64 - countDone chan struct{} - - // First error encountered. - err error - - // errNotNil is closed IFF err contains a non-nil error. - // Receiving from it will block until an error occurs. - errNotNil chan struct{} +type file struct { + kc *KeepClient + segments []*manifest.FileSegment + size int64 // total file size + offset int64 // current read offset + + // current/latest segment accessed -- might or might not match pos + seg *manifest.FileSegment + segStart int64 // position of segment relative to file + segData []byte + segNext []*manifest.FileSegment + readaheadDone bool +} - // rdrClosed is closed IFF the reader's Close() method has - // been called. Any goroutines associated with the reader will - // stop and free up resources when they notice this channel is - // closed. - rdrClosed chan struct{} +// Close implements io.Closer. +func (f *file) Close() error { + f.kc = nil + f.segments = nil + f.segData = nil + return nil } -func (r *cfReader) Read(outbuf []byte) (int, error) { - if r.Error() != nil { - // Short circuit: the caller might as well find out - // now that we hit an error, even if there's buffered - // data we could return. - return 0, r.Error() +// Read implements io.Reader. +func (f *file) Read(buf []byte) (int, error) { + if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) { + // f.seg does not cover the current read offset + // (f.pos). Iterate over f.segments to find the one + // that does. + f.seg = nil + f.segStart = 0 + f.segData = nil + f.segNext = f.segments + for len(f.segNext) > 0 { + seg := f.segNext[0] + f.segNext = f.segNext[1:] + segEnd := f.segStart + int64(seg.Len) + if segEnd > f.offset { + f.seg = seg + break + } + f.segStart = segEnd + } + f.readaheadDone = false } - for len(r.buf) == 0 { - // Private buffer was emptied out by the last Read() - // (or this is the first Read() and r.buf is nil). - // Read from r.toRead until we get a non-empty slice - // or hit an error. - var ok bool - r.buf, ok = <-r.toRead - if r.Error() != nil { - // Error encountered while waiting for bytes - return 0, r.Error() - } else if !ok { - // No more bytes to read, no error encountered - return 0, io.EOF + if f.seg == nil { + return 0, io.EOF + } + if f.segData == nil { + data, err := f.kc.cache().Get(f.kc, f.seg.Locator) + if err != nil { + return 0, err + } + if len(data) < f.seg.Offset+f.seg.Len { + return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator) } + f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len] } - // Copy as much as possible from our private buffer to the - // caller's buffer - n := len(r.buf) - if len(r.buf) > len(outbuf) { - n = len(outbuf) + // dataOff and dataLen denote a portion of f.segData + // corresponding to a portion of the file at f.offset. + dataOff := int(f.offset - f.segStart) + dataLen := f.seg.Len - dataOff + + if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 { + // If we have already read more than just the first + // few bytes of this file, and we have already + // consumed a noticeable portion of this segment, and + // there's more data for this file in the next segment + // ... then there's a good chance we are going to need + // the data for that next segment soon. Start getting + // it into the cache now. + go f.kc.cache().Get(f.kc, f.segNext[0].Locator) + f.readaheadDone = true } - copy(outbuf[:n], r.buf[:n]) - - // Next call to Read() will continue where we left off - r.buf = r.buf[n:] + n := len(buf) + if n > dataLen { + n = dataLen + } + copy(buf[:n], f.segData[dataOff:dataOff+n]) + f.offset += int64(n) return n, nil } -// Close releases resources. It returns a non-nil error if an error -// was encountered by the reader. -func (r *cfReader) Close() error { - close(r.rdrClosed) - return r.Error() -} - -// Error returns an error if one has been encountered, otherwise -// nil. It is safe to call from any goroutine. -func (r *cfReader) Error() error { - select { - case <-r.errNotNil: - return r.err +// Seek implements io.Seeker. +func (f *file) Seek(offset int64, whence int) (int64, error) { + var want int64 + switch whence { + case io.SeekStart: + want = offset + case io.SeekCurrent: + want = f.offset + offset + case io.SeekEnd: + want = f.size + offset default: - return nil + return f.offset, fmt.Errorf("invalid whence %d", whence) + } + if want < 0 { + return f.offset, fmt.Errorf("attempted seek to %d", want) } + if want > f.size { + want = f.size + } + f.offset = want + return f.offset, nil } -// Len returns the total number of bytes in the file being read. If -// necessary, it waits for manifest parsing to finish. -func (r *cfReader) Len() uint64 { - // Wait for all segments to be counted - <-r.countDone - return r.totalSize +// Size returns the file size in bytes. +func (f *file) Size() int64 { + return f.size } -func (r *cfReader) doGet() { - defer close(r.toRead) -GET: - for fs := range r.toGet { - rdr, _, _, err := r.keepClient.Get(fs.Locator) - if err != nil { - r.err = err - close(r.errNotNil) - return - } - var buf = make([]byte, fs.Offset+fs.Len) - _, err = io.ReadFull(rdr, buf) - if err != nil { - r.err = err - close(r.errNotNil) - return - } - for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen { - if bOff+bLen > fs.Offset+fs.Len { - bLen = fs.Offset + fs.Len - bOff - } - select { - case r.toRead <- buf[bOff : bOff+bLen]: - case <-r.rdrClosed: - // Reader is closed: no point sending - // anything more to toRead. - break GET - } - } - // It is possible that r.rdrClosed is closed but we - // never noticed because r.toRead was also ready in - // every select{} above. Here we check before wasting - // a keepclient.Get() call. - select { - case <-r.rdrClosed: - break GET - default: - } +func (f *file) load(m manifest.Manifest, path string) error { + f.segments = nil + f.size = 0 + for seg := range m.FileSegmentIterByName(path) { + f.segments = append(f.segments, seg) + f.size += int64(seg.Len) } - // In case we exited the above loop early: before returning, - // drain the toGet channel so its sender doesn't sit around - // blocking forever. - for _ = range r.toGet { + if f.segments == nil { + return os.ErrNotExist } -} - -func newCFReader(kc *KeepClient) (r *cfReader) { - r = new(cfReader) - r.keepClient = kc - r.rdrClosed = make(chan struct{}) - r.errNotNil = make(chan struct{}) - r.toGet = make(chan *manifest.FileSegment, 2) - r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize) - r.countDone = make(chan struct{}) - go r.doGet() - return + return nil }