+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package keepclient
import (
"io"
"os"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
-// A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
-// Len() method that returns the total number of bytes available to
-// read.
-type Reader interface {
- io.Reader
- io.Seeker
- io.Closer
- Len() uint64
-}
-
const (
// After reading a data block from Keep, cfReader slices it up
// and sends the slices to a buffered channel to be consumed
// parameter when retrieving the collection record).
var ErrNoManifest = errors.New("Collection has no manifest")
-// CollectionFileReader returns a Reader that reads file content from
-// a collection. The filename must be given relative to the root of
-// the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (Reader, error) {
+// CollectionFileReader returns a Reader that reads content from a single file
+// in the collection. The filename must be relative to the root of the
+// collection. A leading prefix of "/" or "./" in the filename is ignored.
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
mText, ok := collection["manifest_text"].(string)
if !ok {
return nil, ErrNoManifest
return kc.ManifestFileReader(m, filename)
}
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (Reader, error) {
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
f := &file{
kc: kc,
}
- return f, f.load(m, filename)
+ err := f.load(m, filename)
+ if err != nil {
+ return nil, err
+ }
+ return f, nil
}
type file struct {
readaheadDone bool
}
+// Close implements io.Closer.
func (f *file) Close() error {
f.kc = nil
f.segments = nil
return nil
}
+// Read implements io.Reader.
func (f *file) Read(buf []byte) (int, error) {
if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
// f.seg does not cover the current read offset
}
f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
}
+ // dataOff and dataLen denote a portion of f.segData
+ // corresponding to a portion of the file at f.offset.
dataOff := int(f.offset - f.segStart)
dataLen := f.seg.Len - dataOff
- if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen >= 1048576 {
+ if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
// If we have already read more than just the first
- // few bytes of this file, and more than just a few
- // bytes of this block, and there's more data for this
- // file in the next segment/block ... then there's a
- // good chance we are going to want the next block
- // soon. Start getting it into the cache now.
+ // few bytes of this file, and we have already
+ // consumed a noticeable portion of this segment, and
+ // there's more data for this file in the next segment
+ // ... then there's a good chance we are going to need
+ // the data for that next segment soon. Start getting
+ // it into the cache now.
go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
f.readaheadDone = true
}
return n, nil
}
+// Seek implements io.Seeker.
func (f *file) Seek(offset int64, whence int) (int64, error) {
var want int64
switch whence {
return f.offset, nil
}
-func (f *file) Len() uint64 {
- return uint64(f.size)
+// Size returns the file size in bytes.
+func (f *file) Size() int64 {
+ return f.size
}
func (f *file) load(m manifest.Manifest, path string) error {