X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b65d8b9008c4d0e6b5816d21bf6f1ae81167ee56..5af28d3da4e6414984ddcae7796495fe149a7988:/sdk/go/keepclient/collectionreader.go diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 5db944cbc8..8e4bb93bfa 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -1,11 +1,15 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( "errors" - "io" "os" - "git.curoverse.com/arvados.git/sdk/go/manifest" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/manifest" ) // ErrNoManifest indicates the given collection has no manifest @@ -13,147 +17,25 @@ import ( // parameter when retrieving the collection record). var ErrNoManifest = errors.New("Collection has no manifest") -// CollectionFileReader returns an io.Reader that reads file content -// from a collection. The filename must be given relative to the root -// of the collection, without a leading "./". -func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) { +// CollectionFileReader returns a Reader that reads content from a single file +// in the collection. The filename must be relative to the root of the +// collection. A leading prefix of "/" or "./" in the filename is ignored. +func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) { mText, ok := collection["manifest_text"].(string) if !ok { return nil, ErrNoManifest } - m := manifest.Manifest{Text: mText} - rdrChan := make(chan *cfReader) - go func() { - // q is a queue of FileSegments that we have received but - // haven't yet been able to send to toGet. - var q []*manifest.FileSegment - var r *cfReader - for seg := range m.FileSegmentIterByName(filename) { - if r == nil { - // We've just discovered that the - // requested filename does appear in - // the manifest, so we can return a - // real reader (not nil) from - // CollectionFileReader(). - r = newCFReader(kc) - rdrChan <- r - } - q = append(q, seg) - r.totalSize += uint64(seg.Len) - // Send toGet whatever it's ready to receive. - Q: for len(q) > 0 { - select { - case r.toGet <- q[0]: - q = q[1:] - default: - break Q - } - } - } - if r == nil { - // File not found - rdrChan <- nil - return - } - close(r.countDone) - for _, seg := range q { - r.toGet <- seg - } - close(r.toGet) - }() - // Before returning a reader, wait until we know whether the - // file exists here: - r := <-rdrChan - if r == nil { - return nil, os.ErrNotExist - } - return r, nil -} - -type cfReader struct { - keepClient *KeepClient - // doGet() reads FileSegments from toGet, gets the data from - // Keep, and sends byte slices to toRead to be consumed by - // Read(). - toGet chan *manifest.FileSegment - toRead chan []byte - // bytes ready to send next time someone calls Read() - buf []byte - // Total size of the file being read. Not safe to read this - // until countDone is closed. - totalSize uint64 - countDone chan struct{} - // First error encountered. - err error -} - -func (r *cfReader) Read(outbuf []byte) (n int, err error) { - if r.err != nil { - return 0, r.err - } - for r.buf == nil || len(r.buf) == 0 { - var ok bool - r.buf, ok = <-r.toRead - if r.err != nil { - return 0, r.err - } else if !ok { - return 0, io.EOF - } - } - if len(r.buf) > len(outbuf) { - n = len(outbuf) - } else { - n = len(r.buf) + fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc) + if err != nil { + return nil, err } - copy(outbuf[:n], r.buf[:n]) - r.buf = r.buf[n:] - return + return fs.OpenFile(filename, os.O_RDONLY, 0) } -func (r *cfReader) Close() error { - _, _ = <-r.countDone - for _ = range r.toGet { +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { + fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc) + if err != nil { + return nil, err } - for _ = range r.toRead { - } - return r.err -} - -func (r *cfReader) Len() uint64 { - // Wait for all segments to be counted - _, _ = <-r.countDone - return r.totalSize -} - -func (r *cfReader) doGet() { - defer close(r.toRead) - for fs := range r.toGet { - rdr, _, _, err := r.keepClient.Get(fs.Locator) - if err != nil { - r.err = err - return - } - var buf = make([]byte, fs.Offset+fs.Len) - _, err = io.ReadFull(rdr, buf) - if err != nil { - r.err = err - return - } - for bOff, bLen := fs.Offset, 1<<20; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen { - if bOff+bLen > fs.Offset+fs.Len { - bLen = fs.Offset + fs.Len - bOff - } - r.toRead <- buf[bOff : bOff+bLen] - } - } -} - -func newCFReader(kc *KeepClient) (r *cfReader) { - r = new(cfReader) - r.keepClient = kc - r.toGet = make(chan *manifest.FileSegment, 2) - r.toRead = make(chan []byte) - r.countDone = make(chan struct{}) - go r.doGet() - return + return fs.OpenFile(filename, os.O_RDONLY, 0) }