1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
13 "git.curoverse.com/arvados.git/sdk/go/arvados"
14 "git.curoverse.com/arvados.git/sdk/go/manifest"
18 // After reading a data block from Keep, cfReader slices it up
19 // and sends the slices to a buffered channel to be consumed
20 // by the caller via Read().
22 // dataSliceSize is the maximum size of the slices, and
23 // therefore the maximum number of bytes that will be returned
24 // by a single call to Read().
25 dataSliceSize = 1 << 20
28 // ErrNoManifest indicates the given collection has no manifest
29 // information (e.g., manifest_text was excluded by a "select"
30 // parameter when retrieving the collection record).
31 var ErrNoManifest = errors.New("Collection has no manifest")
33 // CollectionFileReader returns a Reader that reads content from a single file
34 // in the collection. The filename must be relative to the root of the
35 // collection. A leading prefix of "/" or "./" in the filename is ignored.
36 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
37 mText, ok := collection["manifest_text"].(string)
39 return nil, ErrNoManifest
41 m := manifest.Manifest{Text: mText}
42 return kc.ManifestFileReader(m, filename)
45 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
49 err := f.load(m, filename)
58 segments []*manifest.FileSegment
59 size int64 // total file size
60 offset int64 // current read offset
62 // current/latest segment accessed -- might or might not match pos
63 seg *manifest.FileSegment
64 segStart int64 // position of segment relative to file
66 segNext []*manifest.FileSegment
70 // Close implements io.Closer.
71 func (f *file) Close() error {
78 // Read implements io.Reader.
79 func (f *file) Read(buf []byte) (int, error) {
80 if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
81 // f.seg does not cover the current read offset
82 // (f.pos). Iterate over f.segments to find the one
87 f.segNext = f.segments
88 for len(f.segNext) > 0 {
90 f.segNext = f.segNext[1:]
91 segEnd := f.segStart + int64(seg.Len)
92 if segEnd > f.offset {
98 f.readaheadDone = false
103 if f.segData == nil {
104 data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
108 if len(data) < f.seg.Offset+f.seg.Len {
109 return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
111 f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
113 // dataOff and dataLen denote a portion of f.segData
114 // corresponding to a portion of the file at f.offset.
115 dataOff := int(f.offset - f.segStart)
116 dataLen := f.seg.Len - dataOff
118 if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
119 // If we have already read more than just the first
120 // few bytes of this file, and we have already
121 // consumed a noticeable portion of this segment, and
122 // there's more data for this file in the next segment
123 // ... then there's a good chance we are going to need
124 // the data for that next segment soon. Start getting
125 // it into the cache now.
126 go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
127 f.readaheadDone = true
134 copy(buf[:n], f.segData[dataOff:dataOff+n])
139 // Seek implements io.Seeker.
140 func (f *file) Seek(offset int64, whence int) (int64, error) {
146 want = f.offset + offset
148 want = f.size + offset
150 return f.offset, fmt.Errorf("invalid whence %d", whence)
153 return f.offset, fmt.Errorf("attempted seek to %d", want)
162 // Size returns the file size in bytes.
163 func (f *file) Size() int64 {
167 func (f *file) load(m manifest.Manifest, path string) error {
170 for seg := range m.FileSegmentIterByName(path) {
171 f.segments = append(f.segments, seg)
172 f.size += int64(seg.Len)
174 if f.segments == nil {
175 return os.ErrNotExist