package keepclient
import (
- "io/ioutil"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"sort"
"sync"
"time"
// Get returns data from the cache, first retrieving it from Keep if
// necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) {
c.setupOnce.Do(c.setup)
cacheKey := locator[:32]
c.mtx.Lock()
}
c.cache[cacheKey] = b
go func() {
- rdr, _, _, err := kc.Get(locator)
- var data []byte
+ rdr, bufsize, _, err := kc.Get(locator)
+ c.mtx.Lock()
if err == nil {
- data, err = ioutil.ReadAll(rdr)
+ b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr)
}
- c.mtx.Lock()
- b.data, b.err = data, err
+ b.err = err
c.mtx.Unlock()
close(b.fetched)
go c.Sweep()
c.mtx.Lock()
b.lastUse = time.Now()
c.mtx.Unlock()
- return b.data, b.err
+
+ return b.data.MakeStreamReader(), b.err
}
func (c *BlockCache) setup() {
func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
type cacheBlock struct {
- data []byte
+ data *streamer.AsyncStream
err error
fetched chan struct{}
lastUse time.Time