Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / keepclient / collectionreader.go
index bed60f499562a36c4585018932860fe35df34701..57829aadebb0f3c4aff32c604db08e6e481a1a3b 100644 (file)
@@ -1,20 +1,19 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package keepclient
 
 import (
        "errors"
+       "fmt"
        "io"
        "os"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
-// ReadCloserWithLen extends io.ReadCloser with a Len() method that
-// returns the total number of bytes available to read.
-type ReadCloserWithLen interface {
-       io.ReadCloser
-       Len() uint64
-}
-
 const (
        // After reading a data block from Keep, cfReader slices it up
        // and sends the slices to a buffered channel to be consumed
@@ -31,10 +30,10 @@ const (
 // parameter when retrieving the collection record).
 var ErrNoManifest = errors.New("Collection has no manifest")
 
-// CollectionFileReader returns a ReadCloserWithLen that reads file
-// content from a collection. The filename must be given relative to
-// the root of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
+// CollectionFileReader returns a Reader that reads content from a single file
+// in the collection. The filename must be relative to the root of the
+// collection.  A leading prefix of "/" or "./" in the filename is ignored.
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
        mText, ok := collection["manifest_text"].(string)
        if !ok {
                return nil, ErrNoManifest
@@ -43,218 +42,137 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
        return kc.ManifestFileReader(m, filename)
 }
 
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
-       rdrChan := make(chan *cfReader)
-       go kc.queueSegmentsToGet(m, filename, rdrChan)
-       r, ok := <-rdrChan
-       if !ok {
-               return nil, os.ErrNotExist
-       }
-       return r, nil
-}
-
-// Send segments for the specified file to r.toGet. Send a *cfReader
-// to rdrChan if the specified file is found (even if it's empty).
-// Then, close rdrChan.
-func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
-       defer close(rdrChan)
-
-       // q is a queue of FileSegments that we have received but
-       // haven't yet been able to send to toGet.
-       var q []*manifest.FileSegment
-       var r *cfReader
-       for seg := range m.FileSegmentIterByName(filename) {
-               if r == nil {
-                       // We've just discovered that the requested
-                       // filename does appear in the manifest, so we
-                       // can return a real reader (not nil) from
-                       // CollectionFileReader().
-                       r = newCFReader(kc)
-                       rdrChan <- r
-               }
-               q = append(q, seg)
-               r.totalSize += uint64(seg.Len)
-               // Send toGet as many segments as we can until it
-               // blocks.
-       Q:
-               for len(q) > 0 {
-                       select {
-                       case r.toGet <- q[0]:
-                               q = q[1:]
-                       default:
-                               break Q
-                       }
-               }
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+       f := &file{
+               kc: kc,
        }
-       if r == nil {
-               // File not found.
-               return
+       err := f.load(m, filename)
+       if err != nil {
+               return nil, err
        }
-       close(r.countDone)
-       for _, seg := range q {
-               r.toGet <- seg
-       }
-       close(r.toGet)
+       return f, nil
 }
 
-type cfReader struct {
-       keepClient *KeepClient
-
-       // doGet() reads FileSegments from toGet, gets the data from
-       // Keep, and sends byte slices to toRead to be consumed by
-       // Read().
-       toGet chan *manifest.FileSegment
-
-       // toRead is a buffered channel, sized to fit one full Keep
-       // block. This lets us verify checksums without having a
-       // store-and-forward delay between blocks: by the time the
-       // caller starts receiving data from block N, cfReader is
-       // starting to fetch block N+1. A larger buffer would be
-       // useful for a caller whose read speed varies a lot.
-       toRead chan []byte
-
-       // bytes ready to send next time someone calls Read()
-       buf []byte
-
-       // Total size of the file being read. Not safe to read this
-       // until countDone is closed.
-       totalSize uint64
-       countDone chan struct{}
-
-       // First error encountered.
-       err error
-
-       // errNotNil is closed IFF err contains a non-nil error.
-       // Receiving from it will block until an error occurs.
-       errNotNil chan struct{}
+type file struct {
+       kc       *KeepClient
+       segments []*manifest.FileSegment
+       size     int64 // total file size
+       offset   int64 // current read offset
+
+       // current/latest segment accessed -- might or might not match pos
+       seg           *manifest.FileSegment
+       segStart      int64 // position of segment relative to file
+       segData       []byte
+       segNext       []*manifest.FileSegment
+       readaheadDone bool
+}
 
-       // rdrClosed is closed IFF the reader's Close() method has
-       // been called. Any goroutines associated with the reader will
-       // stop and free up resources when they notice this channel is
-       // closed.
-       rdrClosed chan struct{}
+// Close implements io.Closer.
+func (f *file) Close() error {
+       f.kc = nil
+       f.segments = nil
+       f.segData = nil
+       return nil
 }
 
-func (r *cfReader) Read(outbuf []byte) (int, error) {
-       if r.Error() != nil {
-               // Short circuit: the caller might as well find out
-               // now that we hit an error, even if there's buffered
-               // data we could return.
-               return 0, r.Error()
+// Read implements io.Reader.
+func (f *file) Read(buf []byte) (int, error) {
+       if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
+               // f.seg does not cover the current read offset
+               // (f.pos).  Iterate over f.segments to find the one
+               // that does.
+               f.seg = nil
+               f.segStart = 0
+               f.segData = nil
+               f.segNext = f.segments
+               for len(f.segNext) > 0 {
+                       seg := f.segNext[0]
+                       f.segNext = f.segNext[1:]
+                       segEnd := f.segStart + int64(seg.Len)
+                       if segEnd > f.offset {
+                               f.seg = seg
+                               break
+                       }
+                       f.segStart = segEnd
+               }
+               f.readaheadDone = false
        }
-       for len(r.buf) == 0 {
-               // Private buffer was emptied out by the last Read()
-               // (or this is the first Read() and r.buf is nil).
-               // Read from r.toRead until we get a non-empty slice
-               // or hit an error.
-               var ok bool
-               r.buf, ok = <-r.toRead
-               if r.Error() != nil {
-                       // Error encountered while waiting for bytes
-                       return 0, r.Error()
-               } else if !ok {
-                       // No more bytes to read, no error encountered
-                       return 0, io.EOF
+       if f.seg == nil {
+               return 0, io.EOF
+       }
+       if f.segData == nil {
+               data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
+               if err != nil {
+                       return 0, err
+               }
+               if len(data) < f.seg.Offset+f.seg.Len {
+                       return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
                }
+               f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
        }
-       // Copy as much as possible from our private buffer to the
-       // caller's buffer
-       n := len(r.buf)
-       if len(r.buf) > len(outbuf) {
-               n = len(outbuf)
+       // dataOff and dataLen denote a portion of f.segData
+       // corresponding to a portion of the file at f.offset.
+       dataOff := int(f.offset - f.segStart)
+       dataLen := f.seg.Len - dataOff
+
+       if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+               // If we have already read more than just the first
+               // few bytes of this file, and we have already
+               // consumed a noticeable portion of this segment, and
+               // there's more data for this file in the next segment
+               // ... then there's a good chance we are going to need
+               // the data for that next segment soon. Start getting
+               // it into the cache now.
+               go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
+               f.readaheadDone = true
        }
-       copy(outbuf[:n], r.buf[:n])
-
-       // Next call to Read() will continue where we left off
-       r.buf = r.buf[n:]
 
+       n := len(buf)
+       if n > dataLen {
+               n = dataLen
+       }
+       copy(buf[:n], f.segData[dataOff:dataOff+n])
+       f.offset += int64(n)
        return n, nil
 }
 
-// Close releases resources. It returns a non-nil error if an error
-// was encountered by the reader.
-func (r *cfReader) Close() error {
-       close(r.rdrClosed)
-       return r.Error()
-}
-
-// Error returns an error if one has been encountered, otherwise
-// nil. It is safe to call from any goroutine.
-func (r *cfReader) Error() error {
-       select {
-       case <-r.errNotNil:
-               return r.err
+// Seek implements io.Seeker.
+func (f *file) Seek(offset int64, whence int) (int64, error) {
+       var want int64
+       switch whence {
+       case io.SeekStart:
+               want = offset
+       case io.SeekCurrent:
+               want = f.offset + offset
+       case io.SeekEnd:
+               want = f.size + offset
        default:
-               return nil
+               return f.offset, fmt.Errorf("invalid whence %d", whence)
+       }
+       if want < 0 {
+               return f.offset, fmt.Errorf("attempted seek to %d", want)
        }
+       if want > f.size {
+               want = f.size
+       }
+       f.offset = want
+       return f.offset, nil
 }
 
-// Len returns the total number of bytes in the file being read. If
-// necessary, it waits for manifest parsing to finish.
-func (r *cfReader) Len() uint64 {
-       // Wait for all segments to be counted
-       <-r.countDone
-       return r.totalSize
+// Size returns the file size in bytes.
+func (f *file) Size() int64 {
+       return f.size
 }
 
-func (r *cfReader) doGet() {
-       defer close(r.toRead)
-GET:
-       for fs := range r.toGet {
-               rdr, _, _, err := r.keepClient.Get(fs.Locator)
-               if err != nil {
-                       r.err = err
-                       close(r.errNotNil)
-                       return
-               }
-               var buf = make([]byte, fs.Offset+fs.Len)
-               _, err = io.ReadFull(rdr, buf)
-               errClosing := rdr.Close()
-               if err == nil {
-                       err = errClosing
-               }
-               if err != nil {
-                       r.err = err
-                       close(r.errNotNil)
-                       return
-               }
-               for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
-                       if bOff+bLen > fs.Offset+fs.Len {
-                               bLen = fs.Offset + fs.Len - bOff
-                       }
-                       select {
-                       case r.toRead <- buf[bOff : bOff+bLen]:
-                       case <-r.rdrClosed:
-                               // Reader is closed: no point sending
-                               // anything more to toRead.
-                               break GET
-                       }
-               }
-               // It is possible that r.rdrClosed is closed but we
-               // never noticed because r.toRead was also ready in
-               // every select{} above. Here we check before wasting
-               // a keepclient.Get() call.
-               select {
-               case <-r.rdrClosed:
-                       break GET
-               default:
-               }
+func (f *file) load(m manifest.Manifest, path string) error {
+       f.segments = nil
+       f.size = 0
+       for seg := range m.FileSegmentIterByName(path) {
+               f.segments = append(f.segments, seg)
+               f.size += int64(seg.Len)
        }
-       // In case we exited the above loop early: before returning,
-       // drain the toGet channel so its sender doesn't sit around
-       // blocking forever.
-       for _ = range r.toGet {
+       if f.segments == nil {
+               return os.ErrNotExist
        }
-}
-
-func newCFReader(kc *KeepClient) (r *cfReader) {
-       r = new(cfReader)
-       r.keepClient = kc
-       r.rdrClosed = make(chan struct{})
-       r.errNotNil = make(chan struct{})
-       r.toGet = make(chan *manifest.FileSegment, 2)
-       r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
-       r.countDone = make(chan struct{})
-       go r.doGet()
-       return
+       return nil
 }