X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4f5a6df52559b90d2c9412624f3c4c7fbe467579..7109f4f33c919b07f8e87412c3bc2cc28725296a:/sdk/go/keepclient/collectionreader.go diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 0d05b8a00e..8e4bb93bfa 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -1,22 +1,15 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( "errors" - "io" "os" - "git.curoverse.com/arvados.git/sdk/go/manifest" -) - -const ( - // After reading a data block from Keep, cfReader slices it up - // and sends the slices to a buffered channel to be consumed - // by the caller via Read(). - // - // dataSliceSize is the maximum size of the slices, and - // therefore the maximum number of bytes that will be returned - // by a single call to Read(). - dataSliceSize = 1 << 20 + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/manifest" ) // ErrNoManifest indicates the given collection has no manifest @@ -24,192 +17,25 @@ const ( // parameter when retrieving the collection record). var ErrNoManifest = errors.New("Collection has no manifest") -// CollectionFileReader returns an io.Reader that reads file content -// from a collection. The filename must be given relative to the root -// of the collection, without a leading "./". -func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) { +// CollectionFileReader returns a Reader that reads content from a single file +// in the collection. The filename must be relative to the root of the +// collection. A leading prefix of "/" or "./" in the filename is ignored. +func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) { mText, ok := collection["manifest_text"].(string) if !ok { return nil, ErrNoManifest } - m := manifest.Manifest{Text: mText} - rdrChan := make(chan *cfReader) - go func() { - // q is a queue of FileSegments that we have received but - // haven't yet been able to send to toGet. - var q []*manifest.FileSegment - var r *cfReader - for seg := range m.FileSegmentIterByName(filename) { - if r == nil { - // We've just discovered that the - // requested filename does appear in - // the manifest, so we can return a - // real reader (not nil) from - // CollectionFileReader(). - r = newCFReader(kc) - rdrChan <- r - } - q = append(q, seg) - r.totalSize += uint64(seg.Len) - // Send toGet as many segments as we can until - // it blocks. - Q: - for len(q) > 0 { - select { - case r.toGet <- q[0]: - q = q[1:] - default: - break Q - } - } - } - if r == nil { - // File not found - rdrChan <- nil - return - } - close(r.countDone) - for _, seg := range q { - r.toGet <- seg - } - close(r.toGet) - }() - // Before returning a reader, wait until we know whether the - // file exists here: - r := <-rdrChan - if r == nil { - return nil, os.ErrNotExist - } - return r, nil -} - -type cfReader struct { - keepClient *KeepClient - // doGet() reads FileSegments from toGet, gets the data from - // Keep, and sends byte slices to toRead to be consumed by - // Read(). - toGet chan *manifest.FileSegment - // toRead is a buffered channel, sized to fit one full Keep - // block. This lets us verify checksums without having a - // store-and-forward delay between blocks: by the time the - // caller starts receiving data from block N, cfReader is - // starting to fetch block N+1. A larger buffer would be - // useful for a caller whose read speed varies a lot. - toRead chan []byte - // bytes ready to send next time someone calls Read() - buf []byte - // Total size of the file being read. Not safe to read this - // until countDone is closed. - totalSize uint64 - countDone chan struct{} - // First error encountered. - err error - // errNotNil is closed IFF err contains a non-nil error. - // Receiving from it will block until an error occurs. - errNotNil chan struct{} - // rdrClosed is closed IFF the reader's Close() method has - // been called. Any goroutines associated with the reader will - // stop and free up resources when they notice this channel is - // closed. - rdrClosed chan struct{} -} - -func (r *cfReader) Read(outbuf []byte) (int, error) { - if r.Error() != nil { - return 0, r.Error() - } - for r.buf == nil || len(r.buf) == 0 { - var ok bool - r.buf, ok = <-r.toRead - if r.Error() != nil { - return 0, r.Error() - } else if !ok { - return 0, io.EOF - } - } - n := len(r.buf) - if len(r.buf) > len(outbuf) { - n = len(outbuf) + fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc) + if err != nil { + return nil, err } - copy(outbuf[:n], r.buf[:n]) - r.buf = r.buf[n:] - return n, nil + return fs.OpenFile(filename, os.O_RDONLY, 0) } -func (r *cfReader) Close() error { - close(r.rdrClosed) - return r.Error() -} - -func (r *cfReader) Error() error { - select { - case <-r.errNotNil: - return r.err - default: - return nil - } -} - -func (r *cfReader) Len() uint64 { - // Wait for all segments to be counted - <-r.countDone - return r.totalSize -} - -func (r *cfReader) doGet() { - defer close(r.toRead) -GET: - for fs := range r.toGet { - rdr, _, _, err := r.keepClient.Get(fs.Locator) - if err != nil { - r.err = err - close(r.errNotNil) - return - } - var buf = make([]byte, fs.Offset+fs.Len) - _, err = io.ReadFull(rdr, buf) - if err != nil { - r.err = err - close(r.errNotNil) - return - } - for bOff, bLen := fs.Offset, dataSliceSize; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen { - if bOff+bLen > fs.Offset+fs.Len { - bLen = fs.Offset + fs.Len - bOff - } - select { - case r.toRead <- buf[bOff : bOff+bLen]: - case <-r.rdrClosed: - // Reader is closed: no point sending - // anything more to toRead. - break GET - } - } - // It is possible that r.rdrClosed is closed but we - // never noticed because r.toRead was also ready in - // every select{} above. Here we check before wasting - // a keepclient.Get() call. - select { - case <-r.rdrClosed: - break GET - default: - } - } - // In case we exited the above loop early: before returning, - // drain the toGet channel so its sender doesn't sit around - // blocking forever. - for _ = range r.toGet { +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { + fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc) + if err != nil { + return nil, err } -} - -func newCFReader(kc *KeepClient) (r *cfReader) { - r = new(cfReader) - r.keepClient = kc - r.rdrClosed = make(chan struct{}) - r.errNotNil = make(chan struct{}) - r.toGet = make(chan *manifest.FileSegment, 2) - r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize) - r.countDone = make(chan struct{}) - go r.doGet() - return + return fs.OpenFile(filename, os.O_RDONLY, 0) }