10990: Use AsyncStream to minimize store-and-forward latency
[arvados.git] / sdk / go / keepclient / block_cache.go
index 7d03b68b9cac47a7fff8583851dc0c8100714615..3c1974e7698d2b5ca729e3e547f950687b9cb423 100644 (file)
@@ -1,7 +1,7 @@
 package keepclient
 
 import (
-       "io/ioutil"
+       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "sort"
        "sync"
        "time"
@@ -48,7 +48,7 @@ func (c *BlockCache) Sweep() {
 
 // Get returns data from the cache, first retrieving it from Keep if
 // necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) {
        c.setupOnce.Do(c.setup)
        cacheKey := locator[:32]
        c.mtx.Lock()
@@ -60,13 +60,12 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
                }
                c.cache[cacheKey] = b
                go func() {
-                       rdr, _, _, err := kc.Get(locator)
-                       var data []byte
+                       rdr, bufsize, _, err := kc.Get(locator)
+                       c.mtx.Lock()
                        if err == nil {
-                               data, err = ioutil.ReadAll(rdr)
+                               b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr)
                        }
-                       c.mtx.Lock()
-                       b.data, b.err = data, err
+                       b.err = err
                        c.mtx.Unlock()
                        close(b.fetched)
                        go c.Sweep()
@@ -81,7 +80,8 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
        c.mtx.Lock()
        b.lastUse = time.Now()
        c.mtx.Unlock()
-       return b.data, b.err
+
+       return b.data.MakeStreamReader(), b.err
 }
 
 func (c *BlockCache) setup() {
@@ -97,7 +97,7 @@ func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
 
 type cacheBlock struct {
-       data    []byte
+       data    *streamer.AsyncStream
        err     error
        fetched chan struct{}
        lastUse time.Time