Merge branch '17830-reqid-header-propagation-fix' into main. Closes #17830
[arvados.git] / sdk / go / keepclient / collectionreader.go
index 0d05b8a00ebd74c2f12538c55c89e9e3312e5045..8e4bb93bfa1f8eca6c37090cb930d636231e27d2 100644 (file)
@@ -1,22 +1,15 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package keepclient
 
 import (
        "errors"
-       "io"
        "os"
 
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-)
-
-const (
-       // After reading a data block from Keep, cfReader slices it up
-       // and sends the slices to a buffered channel to be consumed
-       // by the caller via Read().
-       //
-       // dataSliceSize is the maximum size of the slices, and
-       // therefore the maximum number of bytes that will be returned
-       // by a single call to Read().
-       dataSliceSize = 1 << 20
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/manifest"
 )
 
 // ErrNoManifest indicates the given collection has no manifest
@@ -24,192 +17,25 @@ const (
 // parameter when retrieving the collection record).
 var ErrNoManifest = errors.New("Collection has no manifest")
 
-// CollectionFileReader returns an io.Reader that reads file content
-// from a collection. The filename must be given relative to the root
-// of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) {
+// CollectionFileReader returns a Reader that reads content from a single file
+// in the collection. The filename must be relative to the root of the
+// collection.  A leading prefix of "/" or "./" in the filename is ignored.
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
        mText, ok := collection["manifest_text"].(string)
        if !ok {
                return nil, ErrNoManifest
        }
-       m := manifest.Manifest{Text: mText}
-       rdrChan := make(chan *cfReader)
-       go func() {
-               // q is a queue of FileSegments that we have received but
-               // haven't yet been able to send to toGet.
-               var q []*manifest.FileSegment
-               var r *cfReader
-               for seg := range m.FileSegmentIterByName(filename) {
-                       if r == nil {
-                               // We've just discovered that the
-                               // requested filename does appear in
-                               // the manifest, so we can return a
-                               // real reader (not nil) from
-                               // CollectionFileReader().
-                               r = newCFReader(kc)
-                               rdrChan <- r
-                       }
-                       q = append(q, seg)
-                       r.totalSize += uint64(seg.Len)
-                       // Send toGet as many segments as we can until
-                       // it blocks.
-               Q:
-                       for len(q) > 0 {
-                               select {
-                               case r.toGet <- q[0]:
-                                       q = q[1:]
-                               default:
-                                       break Q
-                               }
-                       }
-               }
-               if r == nil {
-                       // File not found
-                       rdrChan <- nil
-                       return
-               }
-               close(r.countDone)
-               for _, seg := range q {
-                       r.toGet <- seg
-               }
-               close(r.toGet)
-       }()
-       // Before returning a reader, wait until we know whether the
-       // file exists here:
-       r := <-rdrChan
-       if r == nil {
-               return nil, os.ErrNotExist
-       }
-       return r, nil
-}
-
-type cfReader struct {
-       keepClient *KeepClient
-       // doGet() reads FileSegments from toGet, gets the data from
-       // Keep, and sends byte slices to toRead to be consumed by
-       // Read().
-       toGet chan *manifest.FileSegment
-       // toRead is a buffered channel, sized to fit one full Keep
-       // block. This lets us verify checksums without having a
-       // store-and-forward delay between blocks: by the time the
-       // caller starts receiving data from block N, cfReader is
-       // starting to fetch block N+1. A larger buffer would be
-       // useful for a caller whose read speed varies a lot.
-       toRead chan []byte
-       // bytes ready to send next time someone calls Read()
-       buf []byte
-       // Total size of the file being read. Not safe to read this
-       // until countDone is closed.
-       totalSize uint64
-       countDone chan struct{}
-       // First error encountered.
-       err error
-       // errNotNil is closed IFF err contains a non-nil error.
-       // Receiving from it will block until an error occurs.
-       errNotNil chan struct{}
-       // rdrClosed is closed IFF the reader's Close() method has
-       // been called. Any goroutines associated with the reader will
-       // stop and free up resources when they notice this channel is
-       // closed.
-       rdrClosed chan struct{}
-}
-
-func (r *cfReader) Read(outbuf []byte) (int, error) {
-       if r.Error() != nil {
-               return 0, r.Error()
-       }
-       for r.buf == nil || len(r.buf) == 0 {
-               var ok bool
-               r.buf, ok = <-r.toRead
-               if r.Error() != nil {
-                       return 0, r.Error()
-               } else if !ok {
-                       return 0, io.EOF
-               }
-       }
-       n := len(r.buf)
-       if len(r.buf) > len(outbuf) {
-               n = len(outbuf)
+       fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
+       if err != nil {
+               return nil, err
        }
-       copy(outbuf[:n], r.buf[:n])
-       r.buf = r.buf[n:]
-       return n, nil
+       return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
 
-func (r *cfReader) Close() error {
-       close(r.rdrClosed)
-       return r.Error()
-}
-
-func (r *cfReader) Error() error {
-       select {
-       case <-r.errNotNil:
-               return r.err
-       default:
-               return nil
-       }
-}
-
-func (r *cfReader) Len() uint64 {
-       // Wait for all segments to be counted
-       <-r.countDone
-       return r.totalSize
-}
-
-func (r *cfReader) doGet() {
-       defer close(r.toRead)
-GET:
-       for fs := range r.toGet {
-               rdr, _, _, err := r.keepClient.Get(fs.Locator)
-               if err != nil {
-                       r.err = err
-                       close(r.errNotNil)
-                       return
-               }
-               var buf = make([]byte, fs.Offset+fs.Len)
-               _, err = io.ReadFull(rdr, buf)
-               if err != nil {
-                       r.err = err
-                       close(r.errNotNil)
-                       return
-               }
-               for bOff, bLen := fs.Offset, dataSliceSize; bOff <= fs.Offset+fs.Len && bLen > 0; bOff += bLen {
-                       if bOff+bLen > fs.Offset+fs.Len {
-                               bLen = fs.Offset + fs.Len - bOff
-                       }
-                       select {
-                       case r.toRead <- buf[bOff : bOff+bLen]:
-                       case <-r.rdrClosed:
-                               // Reader is closed: no point sending
-                               // anything more to toRead.
-                               break GET
-                       }
-               }
-               // It is possible that r.rdrClosed is closed but we
-               // never noticed because r.toRead was also ready in
-               // every select{} above. Here we check before wasting
-               // a keepclient.Get() call.
-               select {
-               case <-r.rdrClosed:
-                       break GET
-               default:
-               }
-       }
-       // In case we exited the above loop early: before returning,
-       // drain the toGet channel so its sender doesn't sit around
-       // blocking forever.
-       for _ = range r.toGet {
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+       fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc)
+       if err != nil {
+               return nil, err
        }
-}
-
-func newCFReader(kc *KeepClient) (r *cfReader) {
-       r = new(cfReader)
-       r.keepClient = kc
-       r.rdrClosed = make(chan struct{})
-       r.errNotNil = make(chan struct{})
-       r.toGet = make(chan *manifest.FileSegment, 2)
-       r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
-       r.countDone = make(chan struct{})
-       go r.doGet()
-       return
+       return fs.OpenFile(filename, os.O_RDONLY, 0)
 }