X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b65d8b9008c4d0e6b5816d21bf6f1ae81167ee56..08793025fb951153ce374f8eb4f984ee21f6a2bc:/sdk/go/keepclient/collectionreader.go diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 5db944cbc8..527318eb49 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -2,158 +2,173 @@ package keepclient import ( "errors" + "fmt" "io" "os" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/manifest" ) +const ( + // After reading a data block from Keep, cfReader slices it up + // and sends the slices to a buffered channel to be consumed + // by the caller via Read(). + // + // dataSliceSize is the maximum size of the slices, and + // therefore the maximum number of bytes that will be returned + // by a single call to Read(). + dataSliceSize = 1 << 20 +) + // ErrNoManifest indicates the given collection has no manifest // information (e.g., manifest_text was excluded by a "select" // parameter when retrieving the collection record). var ErrNoManifest = errors.New("Collection has no manifest") -// CollectionFileReader returns an io.Reader that reads file content -// from a collection. The filename must be given relative to the root -// of the collection, without a leading "./". -func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) { +// CollectionFileReader returns a Reader that reads content from a single file +// in the collection. The filename must be relative to the root of the +// collection. A leading prefix of "/" or "./" in the filename is ignored. +func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) { mText, ok := collection["manifest_text"].(string) if !ok { return nil, ErrNoManifest } m := manifest.Manifest{Text: mText} - rdrChan := make(chan *cfReader) - go func() { - // q is a queue of FileSegments that we have received but - // haven't yet been able to send to toGet. - var q []*manifest.FileSegment - var r *cfReader - for seg := range m.FileSegmentIterByName(filename) { - if r == nil { - // We've just discovered that the - // requested filename does appear in - // the manifest, so we can return a - // real reader (not nil) from - // CollectionFileReader(). - r = newCFReader(kc) - rdrChan <- r - } - q = append(q, seg) - r.totalSize += uint64(seg.Len) - // Send toGet whatever it's ready to receive. - Q: for len(q) > 0 { - select { - case r.toGet <- q[0]: - q = q[1:] - default: - break Q - } - } - } - if r == nil { - // File not found - rdrChan <- nil - return - } - close(r.countDone) - for _, seg := range q { - r.toGet <- seg - } - close(r.toGet) - }() - // Before returning a reader, wait until we know whether the - // file exists here: - r := <-rdrChan - if r == nil { - return nil, os.ErrNotExist + return kc.ManifestFileReader(m, filename) +} + +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { + f := &file{ + kc: kc, + } + err := f.load(m, filename) + if err != nil { + return nil, err } - return r, nil + return f, nil +} + +type file struct { + kc *KeepClient + segments []*manifest.FileSegment + size int64 // total file size + offset int64 // current read offset + + // current/latest segment accessed -- might or might not match pos + seg *manifest.FileSegment + segStart int64 // position of segment relative to file + segData []byte + segNext []*manifest.FileSegment + readaheadDone bool } -type cfReader struct { - keepClient *KeepClient - // doGet() reads FileSegments from toGet, gets the data from - // Keep, and sends byte slices to toRead to be consumed by - // Read(). - toGet chan *manifest.FileSegment - toRead chan []byte - // bytes ready to send next time someone calls Read() - buf []byte - // Total size of the file being read. Not safe to read this - // until countDone is closed. - totalSize uint64 - countDone chan struct{} - // First error encountered. - err error +// Close implements io.Closer. +func (f *file) Close() error { + f.kc = nil + f.segments = nil + f.segData = nil + return nil } -func (r *cfReader) Read(outbuf []byte) (n int, err error) { - if r.err != nil { - return 0, r.err +// Read implements io.Reader. +func (f *file) Read(buf []byte) (int, error) { + if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) { + // f.seg does not cover the current read offset + // (f.pos). Iterate over f.segments to find the one + // that does. + f.seg = nil + f.segStart = 0 + f.segData = nil + f.segNext = f.segments + for len(f.segNext) > 0 { + seg := f.segNext[0] + f.segNext = f.segNext[1:] + segEnd := f.segStart + int64(seg.Len) + if segEnd > f.offset { + f.seg = seg + break + } + f.segStart = segEnd + } + f.readaheadDone = false + } + if f.seg == nil { + return 0, io.EOF } - for r.buf == nil || len(r.buf) == 0 { - var ok bool - r.buf, ok = <-r.toRead - if r.err != nil { - return 0, r.err - } else if !ok { - return 0, io.EOF + if f.segData == nil { + data, err := f.kc.cache().Get(f.kc, f.seg.Locator) + if err != nil { + return 0, err + } + if len(data) < f.seg.Offset+f.seg.Len { + return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator) } + f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len] + } + // dataOff and dataLen denote a portion of f.segData + // corresponding to a portion of the file at f.offset. + dataOff := int(f.offset - f.segStart) + dataLen := f.seg.Len - dataOff + + if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 { + // If we have already read more than just the first + // few bytes of this file, and we have already + // consumed a noticeable portion of this segment, and + // there's more data for this file in the next segment + // ... then there's a good chance we are going to need + // the data for that next segment soon. Start getting + // it into the cache now. + go f.kc.cache().Get(f.kc, f.segNext[0].Locator) + f.readaheadDone = true } - if len(r.buf) > len(outbuf) { - n = len(outbuf) - } else { - n = len(r.buf) + + n := len(buf) + if n > dataLen { + n = dataLen } - copy(outbuf[:n], r.buf[:n]) - r.buf = r.buf[n:] - return + copy(buf[:n], f.segData[dataOff:dataOff+n]) + f.offset += int64(n) + return n, nil } -func (r *cfReader) Close() error { - _, _ = <-r.countDone - for _ = range r.toGet { +// Seek implements io.Seeker. +func (f *file) Seek(offset int64, whence int) (int64, error) { + var want int64 + switch whence { + case io.SeekStart: + want = offset + case io.SeekCurrent: + want = f.offset + offset + case io.SeekEnd: + want = f.size + offset + default: + return f.offset, fmt.Errorf("invalid whence %d", whence) + } + if want < 0 { + return f.offset, fmt.Errorf("attempted seek to %d", want) } - for _ = range r.toRead { + if want > f.size { + want = f.size } - return r.err + f.offset = want + return f.offset, nil } -func (r *cfReader) Len() uint64 { - // Wait for all segments to be counted - _, _ = <-r.countDone - return r.totalSize +// Size returns the file size in bytes. +func (f *file) Size() int64 { + return f.size } -func (r *cfReader) doGet() { - defer close(r.toRead) - for fs := range r.toGet { - rdr, _, _, err := r.keepClient.Get(fs.Locator) - if err != nil { - r.err = err - return - } - var buf = make([]byte, fs.Offset+fs.Len) - _, err = io.ReadFull(rdr, buf) - if err != nil { - r.err = err - return - } - for bOff, bLen := fs.Offset, 1<<20; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen { - if bOff+bLen > fs.Offset+fs.Len { - bLen = fs.Offset + fs.Len - bOff - } - r.toRead <- buf[bOff : bOff+bLen] - } +func (f *file) load(m manifest.Manifest, path string) error { + f.segments = nil + f.size = 0 + for seg := range m.FileSegmentIterByName(path) { + f.segments = append(f.segments, seg) + f.size += int64(seg.Len) } -} - -func newCFReader(kc *KeepClient) (r *cfReader) { - r = new(cfReader) - r.keepClient = kc - r.toGet = make(chan *manifest.FileSegment, 2) - r.toRead = make(chan []byte) - r.countDone = make(chan struct{}) - go r.doGet() - return + if f.segments == nil { + return os.ErrNotExist + } + return nil }