+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package keepclient
import (
"errors"
+ "fmt"
"io"
"os"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
// parameter when retrieving the collection record).
var ErrNoManifest = errors.New("Collection has no manifest")
-// CollectionFileReader returns an io.Reader that reads file content
-// from a collection. The filename must be given relative to the root
-// of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) {
+// CollectionFileReader returns a Reader that reads content from a single file
+// in the collection. The filename must be relative to the root of the
+// collection. A leading prefix of "/" or "./" in the filename is ignored.
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
mText, ok := collection["manifest_text"].(string)
if !ok {
return nil, ErrNoManifest
}
m := manifest.Manifest{Text: mText}
- rdrChan := make(chan *cfReader)
- go func() {
- // q is a queue of FileSegments that we have received but
- // haven't yet been able to send to toGet.
- var q []*manifest.FileSegment
- var r *cfReader
- for seg := range m.FileSegmentIterByName(filename) {
- if r == nil {
- // We've just discovered that the
- // requested filename does appear in
- // the manifest, so we can return a
- // real reader (not nil) from
- // CollectionFileReader().
- r = newCFReader(kc)
- rdrChan <- r
- }
- q = append(q, seg)
- r.totalSize += uint64(seg.Len)
- // Send toGet as many segments as we can until
- // it blocks.
- Q:
- for len(q) > 0 {
- select {
- case r.toGet <- q[0]:
- q = q[1:]
- default:
- break Q
- }
- }
- }
- if r == nil {
- // File not found
- rdrChan <- nil
- return
- }
- close(r.countDone)
- for _, seg := range q {
- r.toGet <- seg
- }
- close(r.toGet)
- }()
- // Before returning a reader, wait until we know whether the
- // file exists here:
- r := <-rdrChan
- if r == nil {
- return nil, os.ErrNotExist
+ return kc.ManifestFileReader(m, filename)
+}
+
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+ f := &file{
+ kc: kc,
+ }
+ err := f.load(m, filename)
+ if err != nil {
+ return nil, err
}
- return r, nil
+ return f, nil
}
-type cfReader struct {
- keepClient *KeepClient
- // doGet() reads FileSegments from toGet, gets the data from
- // Keep, and sends byte slices to toRead to be consumed by
- // Read().
- toGet chan *manifest.FileSegment
- // toRead is a buffered channel, sized to fit one full Keep
- // block. This lets us verify checksums without having a
- // store-and-forward delay between blocks: by the time the
- // caller starts receiving data from block N, cfReader is
- // starting to fetch block N+1. A larger buffer would be
- // useful for a caller whose read speed varies a lot.
- toRead chan []byte
- // bytes ready to send next time someone calls Read()
- buf []byte
- // Total size of the file being read. Not safe to read this
- // until countDone is closed.
- totalSize uint64
- countDone chan struct{}
- // First error encountered.
- err error
- // errNotNil is closed IFF err contains a non-nil error.
- // Receiving from it will block until an error occurs.
- errNotNil chan struct{}
- // rdrClosed is closed IFF the reader's Close() method has
- // been called. Any goroutines associated with the reader will
- // stop and free up resources when they notice this channel is
- // closed.
- rdrClosed chan struct{}
+type file struct {
+ kc *KeepClient
+ segments []*manifest.FileSegment
+ size int64 // total file size
+ offset int64 // current read offset
+
+ // current/latest segment accessed -- might or might not match pos
+ seg *manifest.FileSegment
+ segStart int64 // position of segment relative to file
+ segData []byte
+ segNext []*manifest.FileSegment
+ readaheadDone bool
+}
+
+// Close implements io.Closer.
+func (f *file) Close() error {
+ f.kc = nil
+ f.segments = nil
+ f.segData = nil
+ return nil
}
-func (r *cfReader) Read(outbuf []byte) (int, error) {
- if r.Error() != nil {
- return 0, r.Error()
+// Read implements io.Reader.
+func (f *file) Read(buf []byte) (int, error) {
+ if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
+ // f.seg does not cover the current read offset
+ // (f.pos). Iterate over f.segments to find the one
+ // that does.
+ f.seg = nil
+ f.segStart = 0
+ f.segData = nil
+ f.segNext = f.segments
+ for len(f.segNext) > 0 {
+ seg := f.segNext[0]
+ f.segNext = f.segNext[1:]
+ segEnd := f.segStart + int64(seg.Len)
+ if segEnd > f.offset {
+ f.seg = seg
+ break
+ }
+ f.segStart = segEnd
+ }
+ f.readaheadDone = false
+ }
+ if f.seg == nil {
+ return 0, io.EOF
}
- for r.buf == nil || len(r.buf) == 0 {
- var ok bool
- r.buf, ok = <-r.toRead
- if r.Error() != nil {
- return 0, r.Error()
- } else if !ok {
- return 0, io.EOF
+ if f.segData == nil {
+ data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
+ if err != nil {
+ return 0, err
+ }
+ if len(data) < f.seg.Offset+f.seg.Len {
+ return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
}
+ f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
}
- n := len(r.buf)
- if len(r.buf) > len(outbuf) {
- n = len(outbuf)
+ // dataOff and dataLen denote a portion of f.segData
+ // corresponding to a portion of the file at f.offset.
+ dataOff := int(f.offset - f.segStart)
+ dataLen := f.seg.Len - dataOff
+
+ if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+ // If we have already read more than just the first
+ // few bytes of this file, and we have already
+ // consumed a noticeable portion of this segment, and
+ // there's more data for this file in the next segment
+ // ... then there's a good chance we are going to need
+ // the data for that next segment soon. Start getting
+ // it into the cache now.
+ go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
+ f.readaheadDone = true
}
- copy(outbuf[:n], r.buf[:n])
- r.buf = r.buf[n:]
- return n, nil
-}
-func (r *cfReader) Close() error {
- close(r.rdrClosed)
- return r.Error()
+ n := len(buf)
+ if n > dataLen {
+ n = dataLen
+ }
+ copy(buf[:n], f.segData[dataOff:dataOff+n])
+ f.offset += int64(n)
+ return n, nil
}
-func (r *cfReader) Error() error {
- select {
- case <-r.errNotNil:
- return r.err
+// Seek implements io.Seeker.
+func (f *file) Seek(offset int64, whence int) (int64, error) {
+ var want int64
+ switch whence {
+ case io.SeekStart:
+ want = offset
+ case io.SeekCurrent:
+ want = f.offset + offset
+ case io.SeekEnd:
+ want = f.size + offset
default:
- return nil
+ return f.offset, fmt.Errorf("invalid whence %d", whence)
}
+ if want < 0 {
+ return f.offset, fmt.Errorf("attempted seek to %d", want)
+ }
+ if want > f.size {
+ want = f.size
+ }
+ f.offset = want
+ return f.offset, nil
}
-func (r *cfReader) Len() uint64 {
- // Wait for all segments to be counted
- <-r.countDone
- return r.totalSize
+// Size returns the file size in bytes.
+func (f *file) Size() int64 {
+ return f.size
}
-func (r *cfReader) doGet() {
- defer close(r.toRead)
-GET:
- for fs := range r.toGet {
- rdr, _, _, err := r.keepClient.Get(fs.Locator)
- if err != nil {
- r.err = err
- close(r.errNotNil)
- return
- }
- var buf = make([]byte, fs.Offset+fs.Len)
- _, err = io.ReadFull(rdr, buf)
- if err != nil {
- r.err = err
- close(r.errNotNil)
- return
- }
- for bOff, bLen := fs.Offset, dataSliceSize; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen {
- if bOff+bLen > fs.Offset+fs.Len {
- bLen = fs.Offset + fs.Len - bOff
- }
- select {
- case r.toRead <- buf[bOff : bOff+bLen]:
- case <-r.rdrClosed:
- // Reader is closed: no point sending
- // anything more to toRead.
- break GET
- }
- }
- // It is possible that r.rdrClosed is closed but we
- // never noticed because r.toRead was also ready in
- // every select{} above. Here we check before wasting
- // a keepclient.Get() call.
- select {
- case <-r.rdrClosed:
- break GET
- default:
- }
+func (f *file) load(m manifest.Manifest, path string) error {
+ f.segments = nil
+ f.size = 0
+ for seg := range m.FileSegmentIterByName(path) {
+ f.segments = append(f.segments, seg)
+ f.size += int64(seg.Len)
}
- // In case we exited the above loop early: before returning,
- // drain the toGet channel so its sender doesn't sit around
- // blocking forever.
- for _ = range r.toGet {
+ if f.segments == nil {
+ return os.ErrNotExist
}
-}
-
-func newCFReader(kc *KeepClient) (r *cfReader) {
- r = new(cfReader)
- r.keepClient = kc
- r.rdrClosed = make(chan struct{})
- r.errNotNil = make(chan struct{})
- r.toGet = make(chan *manifest.FileSegment, 2)
- r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
- r.countDone = make(chan struct{})
- go r.doGet()
- return
+ return nil
}