10990: Use AsyncStream to minimize store-and-forward latency 10990-keep-web-ranges-pa
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 27 Jan 2017 19:32:19 +0000 (14:32 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 27 Jan 2017 19:32:19 +0000 (14:32 -0500)
sdk/go/keepclient/block_cache.go
sdk/go/keepclient/collectionreader.go
sdk/go/streamer/streamer.go

index 7d03b68b9cac47a7fff8583851dc0c8100714615..3c1974e7698d2b5ca729e3e547f950687b9cb423 100644 (file)
@@ -1,7 +1,7 @@
 package keepclient
 
 import (
-       "io/ioutil"
+       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "sort"
        "sync"
        "time"
@@ -48,7 +48,7 @@ func (c *BlockCache) Sweep() {
 
 // Get returns data from the cache, first retrieving it from Keep if
 // necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) {
        c.setupOnce.Do(c.setup)
        cacheKey := locator[:32]
        c.mtx.Lock()
@@ -60,13 +60,12 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
                }
                c.cache[cacheKey] = b
                go func() {
-                       rdr, _, _, err := kc.Get(locator)
-                       var data []byte
+                       rdr, bufsize, _, err := kc.Get(locator)
+                       c.mtx.Lock()
                        if err == nil {
-                               data, err = ioutil.ReadAll(rdr)
+                               b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr)
                        }
-                       c.mtx.Lock()
-                       b.data, b.err = data, err
+                       b.err = err
                        c.mtx.Unlock()
                        close(b.fetched)
                        go c.Sweep()
@@ -81,7 +80,8 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
        c.mtx.Lock()
        b.lastUse = time.Now()
        c.mtx.Unlock()
-       return b.data, b.err
+
+       return b.data.MakeStreamReader(), b.err
 }
 
 func (c *BlockCache) setup() {
@@ -97,7 +97,7 @@ func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
 
 type cacheBlock struct {
-       data    []byte
+       data    *streamer.AsyncStream
        err     error
        fetched chan struct{}
        lastUse time.Time
index 61fabefacdd5ee937faab81c792370bd8af2c675..a87041c655ab817ae2344104e252b43ef11f4fff 100644 (file)
@@ -3,10 +3,10 @@ package keepclient
 import (
        "errors"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "os"
-
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
@@ -67,7 +67,7 @@ type file struct {
        // current/latest segment accessed -- might or might not match pos
        seg           *manifest.FileSegment
        segStart      int64 // position of segment relative to file
-       segData       []byte
+       segData       *streamer.StreamReader
        segNext       []*manifest.FileSegment
        readaheadDone bool
 }
@@ -76,6 +76,9 @@ type file struct {
 func (f *file) Close() error {
        f.kc = nil
        f.segments = nil
+       if f.segData != nil {
+               f.segData.Close()
+       }
        f.segData = nil
        return nil
 }
@@ -88,6 +91,9 @@ func (f *file) Read(buf []byte) (int, error) {
                // that does.
                f.seg = nil
                f.segStart = 0
+               if f.segData != nil {
+                       f.segData.Close()
+               }
                f.segData = nil
                f.segNext = f.segments
                for len(f.segNext) > 0 {
@@ -110,17 +116,17 @@ func (f *file) Read(buf []byte) (int, error) {
                if err != nil {
                        return 0, err
                }
-               if len(data) < f.seg.Offset+f.seg.Len {
-                       return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
+               if int(data.Len()) < f.seg.Offset+f.seg.Len {
+                       return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, data.Len(), f.seg.Locator)
                }
-               f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
+               f.segData = data
        }
        // dataOff and dataLen denote a portion of f.segData
        // corresponding to a portion of the file at f.offset.
        dataOff := int(f.offset - f.segStart)
-       dataLen := f.seg.Len - dataOff
+       dataLen := f.seg.Len - (f.seg.Offset + dataOff)
 
-       if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+       if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && uint64(dataOff+dataLen) > f.segData.Len()/16 {
                // If we have already read more than just the first
                // few bytes of this file, and we have already
                // consumed a noticeable portion of this segment, and
@@ -136,9 +142,10 @@ func (f *file) Read(buf []byte) (int, error) {
        if n > dataLen {
                n = dataLen
        }
-       copy(buf[:n], f.segData[dataOff:dataOff+n])
-       f.offset += int64(n)
-       return n, nil
+       f.segData.Seek(int64(f.seg.Offset+dataOff), io.SeekStart)
+       count, err := f.segData.Read(buf[:n])
+       f.offset += int64(count)
+       return count, err
 }
 
 // Seek implements io.Seeker.
index a46ca4cc55aa5c3faa8ccdbe5b73339aacfa50ef..c8d012f76591470551e51dcb8faea87eea2aecea 100644 (file)
@@ -37,6 +37,7 @@ package streamer
 
 import (
        "errors"
+       "fmt"
        "io"
 )
 
@@ -152,3 +153,29 @@ func (this *AsyncStream) Close() error {
        close(this.wait_zero_readers)
        return nil
 }
+
+func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
+       var want int64
+       switch whence {
+       case io.SeekStart:
+               want = offset
+       case io.SeekCurrent:
+               want = int64(this.offset) + offset
+       case io.SeekEnd:
+               want = int64(this.Len()) + offset
+       default:
+               return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
+       }
+       if want < 0 {
+               return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
+       }
+       if want > int64(this.Len()) {
+               want = int64(this.Len())
+       }
+       this.offset = int(want)
+       return want, nil
+}
+
+func (this *StreamReader) Len() uint64 {
+       return uint64(len(this.stream.buffer))
+}