package keepclient
import (
- "io/ioutil"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"sort"
"sync"
"time"
// Get returns data from the cache, first retrieving it from Keep if
// necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) {
c.setupOnce.Do(c.setup)
cacheKey := locator[:32]
c.mtx.Lock()
}
c.cache[cacheKey] = b
go func() {
- rdr, _, _, err := kc.Get(locator)
- var data []byte
+ rdr, bufsize, _, err := kc.Get(locator)
+ c.mtx.Lock()
if err == nil {
- data, err = ioutil.ReadAll(rdr)
+ b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr)
}
- c.mtx.Lock()
- b.data, b.err = data, err
+ b.err = err
c.mtx.Unlock()
close(b.fetched)
go c.Sweep()
c.mtx.Lock()
b.lastUse = time.Now()
c.mtx.Unlock()
- return b.data, b.err
+
+ return b.data.MakeStreamReader(), b.err
}
func (c *BlockCache) setup() {
func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
type cacheBlock struct {
- data []byte
+ data *streamer.AsyncStream
err error
fetched chan struct{}
lastUse time.Time
import (
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"os"
-
- "git.curoverse.com/arvados.git/sdk/go/manifest"
)
// A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
// current/latest segment accessed -- might or might not match pos
seg *manifest.FileSegment
segStart int64 // position of segment relative to file
- segData []byte
+ segData *streamer.StreamReader
segNext []*manifest.FileSegment
readaheadDone bool
}
func (f *file) Close() error {
f.kc = nil
f.segments = nil
+ if f.segData != nil {
+ f.segData.Close()
+ }
f.segData = nil
return nil
}
// that does.
f.seg = nil
f.segStart = 0
+ if f.segData != nil {
+ f.segData.Close()
+ }
f.segData = nil
f.segNext = f.segments
for len(f.segNext) > 0 {
if err != nil {
return 0, err
}
- if len(data) < f.seg.Offset+f.seg.Len {
- return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
+ if int(data.Len()) < f.seg.Offset+f.seg.Len {
+ return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, data.Len(), f.seg.Locator)
}
- f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
+ f.segData = data
}
// dataOff and dataLen denote a portion of f.segData
// corresponding to a portion of the file at f.offset.
dataOff := int(f.offset - f.segStart)
- dataLen := f.seg.Len - dataOff
+ dataLen := f.seg.Len - (f.seg.Offset + dataOff)
- if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+ if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && uint64(dataOff+dataLen) > f.segData.Len()/16 {
// If we have already read more than just the first
// few bytes of this file, and we have already
// consumed a noticeable portion of this segment, and
if n > dataLen {
n = dataLen
}
- copy(buf[:n], f.segData[dataOff:dataOff+n])
- f.offset += int64(n)
- return n, nil
+ f.segData.Seek(int64(f.seg.Offset+dataOff), io.SeekStart)
+ count, err := f.segData.Read(buf[:n])
+ f.offset += int64(count)
+ return count, err
}
// Seek implements io.Seeker.
import (
"errors"
+ "fmt"
"io"
)
close(this.wait_zero_readers)
return nil
}
+
+func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
+ var want int64
+ switch whence {
+ case io.SeekStart:
+ want = offset
+ case io.SeekCurrent:
+ want = int64(this.offset) + offset
+ case io.SeekEnd:
+ want = int64(this.Len()) + offset
+ default:
+ return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
+ }
+ if want < 0 {
+ return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
+ }
+ if want > int64(this.Len()) {
+ want = int64(this.Len())
+ }
+ this.offset = int(want)
+ return want, nil
+}
+
+func (this *StreamReader) Len() uint64 {
+ return uint64(len(this.stream.buffer))
+}