X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b7b9ea44ada30b1251fb10c872cb1da1d7c29bd0..00f1f05789316936db75b4723b1c3d99196c252a:/sdk/go/keepclient/collectionreader.go?ds=sidebyside diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 33bb58710e..8e4bb93bfa 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -1,29 +1,15 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( "errors" - "io" "os" - "git.curoverse.com/arvados.git/sdk/go/manifest" -) - -// ReadCloserWithLen extends io.ReadCloser with a Len() method that -// returns the total number of bytes available to read. -type ReadCloserWithLen interface { - io.ReadCloser - Len() uint64 -} - -const ( - // After reading a data block from Keep, cfReader slices it up - // and sends the slices to a buffered channel to be consumed - // by the caller via Read(). - // - // dataSliceSize is the maximum size of the slices, and - // therefore the maximum number of bytes that will be returned - // by a single call to Read(). - dataSliceSize = 1 << 20 + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/manifest" ) // ErrNoManifest indicates the given collection has no manifest @@ -31,230 +17,25 @@ const ( // parameter when retrieving the collection record). var ErrNoManifest = errors.New("Collection has no manifest") -// CollectionFileReader returns a ReadCloserWithLen that reads file -// content from a collection. The filename must be given relative to -// the root of the collection, without a leading "./". -func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) { +// CollectionFileReader returns a Reader that reads content from a single file +// in the collection. The filename must be relative to the root of the +// collection. A leading prefix of "/" or "./" in the filename is ignored. +func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) { mText, ok := collection["manifest_text"].(string) if !ok { return nil, ErrNoManifest } - m := manifest.Manifest{Text: mText} - return kc.ManifestFileReader(m, filename) -} - -func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) { - rdrChan := make(chan *cfReader) - go kc.queueSegmentsToGet(m, filename, rdrChan) - r, ok := <-rdrChan - if !ok { - return nil, os.ErrNotExist - } - return r, nil -} - -// Send segments for the specified file to r.toGet. Send a *cfReader -// to rdrChan if the specified file is found (even if it's empty). -// Then, close rdrChan. -func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) { - defer close(rdrChan) - - // q is a queue of FileSegments that we have received but - // haven't yet been able to send to toGet. - var q []*manifest.FileSegment - var r *cfReader - for seg := range m.FileSegmentIterByName(filename) { - if r == nil { - // We've just discovered that the requested - // filename does appear in the manifest, so we - // can return a real reader (not nil) from - // CollectionFileReader(). - r = newCFReader(kc) - rdrChan <- r - } - q = append(q, seg) - r.totalSize += uint64(seg.Len) - // Send toGet as many segments as we can until it - // blocks. - Q: - for len(q) > 0 { - select { - case r.toGet <- q[0]: - q = q[1:] - default: - break Q - } - } - } - if r == nil { - // File not found. - return - } - close(r.countDone) - for _, seg := range q { - r.toGet <- seg - } - close(r.toGet) -} - -type cfReader struct { - keepClient *KeepClient - - // doGet() reads FileSegments from toGet, gets the data from - // Keep, and sends byte slices to toRead to be consumed by - // Read(). - toGet chan *manifest.FileSegment - - // toRead is a buffered channel, sized to fit one full Keep - // block. This lets us verify checksums without having a - // store-and-forward delay between blocks: by the time the - // caller starts receiving data from block N, cfReader is - // starting to fetch block N+1. A larger buffer would be - // useful for a caller whose read speed varies a lot. - toRead chan []byte - - // bytes ready to send next time someone calls Read() - buf []byte - - // Total size of the file being read. Not safe to read this - // until countDone is closed. - totalSize uint64 - countDone chan struct{} - - // First error encountered. - err error - - // errNotNil is closed IFF err contains a non-nil error. - // Receiving from it will block until an error occurs. - errNotNil chan struct{} - - // rdrClosed is closed IFF the reader's Close() method has - // been called. Any goroutines associated with the reader will - // stop and free up resources when they notice this channel is - // closed. - rdrClosed chan struct{} -} - -func (r *cfReader) Read(outbuf []byte) (int, error) { - if r.Error() != nil { - // Short circuit: the caller might as well find out - // now that we hit an error, even if there's buffered - // data we could return. - return 0, r.Error() - } - for len(r.buf) == 0 { - // Private buffer was emptied out by the last Read() - // (or this is the first Read() and r.buf is nil). - // Read from r.toRead until we get a non-empty slice - // or hit an error. - var ok bool - r.buf, ok = <-r.toRead - if r.Error() != nil { - // Error encountered while waiting for bytes - return 0, r.Error() - } else if !ok { - // No more bytes to read, no error encountered - return 0, io.EOF - } - } - // Copy as much as possible from our private buffer to the - // caller's buffer - n := len(r.buf) - if len(r.buf) > len(outbuf) { - n = len(outbuf) - } - copy(outbuf[:n], r.buf[:n]) - - // Next call to Read() will continue where we left off - r.buf = r.buf[n:] - - return n, nil -} - -// Close releases resources. It returns a non-nil error if an error -// was encountered by the reader. -func (r *cfReader) Close() error { - close(r.rdrClosed) - return r.Error() -} - -// Error returns an error if one has been encountered, otherwise -// nil. It is safe to call from any goroutine. -func (r *cfReader) Error() error { - select { - case <-r.errNotNil: - return r.err - default: - return nil + fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc) + if err != nil { + return nil, err } + return fs.OpenFile(filename, os.O_RDONLY, 0) } -// Len returns the total number of bytes in the file being read. If -// necessary, it waits for manifest parsing to finish. -func (r *cfReader) Len() uint64 { - // Wait for all segments to be counted - <-r.countDone - return r.totalSize -} - -func (r *cfReader) doGet() { - defer close(r.toRead) -GET: - for fs := range r.toGet { - rdr, _, _, err := r.keepClient.Get(fs.Locator) - if err != nil { - r.err = err - close(r.errNotNil) - return - } - var buf = make([]byte, fs.Offset+fs.Len) - _, err = io.ReadFull(rdr, buf) - errClosing := rdr.Close() - if err == nil { - err = errClosing - } - if err != nil { - r.err = err - close(r.errNotNil) - return - } - for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen { - if bOff+bLen > fs.Offset+fs.Len { - bLen = fs.Offset + fs.Len - bOff - } - select { - case r.toRead <- buf[bOff : bOff+bLen]: - case <-r.rdrClosed: - // Reader is closed: no point sending - // anything more to toRead. - break GET - } - } - // It is possible that r.rdrClosed is closed but we - // never noticed because r.toRead was also ready in - // every select{} above. Here we check before wasting - // a keepclient.Get() call. - select { - case <-r.rdrClosed: - break GET - default: - } - } - // In case we exited the above loop early: before returning, - // drain the toGet channel so its sender doesn't sit around - // blocking forever. - for range r.toGet { +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { + fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc) + if err != nil { + return nil, err } -} - -func newCFReader(kc *KeepClient) (r *cfReader) { - r = new(cfReader) - r.keepClient = kc - r.rdrClosed = make(chan struct{}) - r.errNotNil = make(chan struct{}) - r.toGet = make(chan *manifest.FileSegment, 2) - r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize) - r.countDone = make(chan struct{}) - go r.doGet() - return + return fs.OpenFile(filename, os.O_RDONLY, 0) }