From 1b16fe46cccf4af94349ddad730b5c7f3bc03718 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 27 Jan 2017 14:32:19 -0500 Subject: [PATCH] 10990: Use AsyncStream to minimize store-and-forward latency --- sdk/go/keepclient/block_cache.go | 18 ++++++++--------- sdk/go/keepclient/collectionreader.go | 29 +++++++++++++++++---------- sdk/go/streamer/streamer.go | 27 +++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go index 7d03b68b9c..3c1974e769 100644 --- a/sdk/go/keepclient/block_cache.go +++ b/sdk/go/keepclient/block_cache.go @@ -1,7 +1,7 @@ package keepclient import ( - "io/ioutil" + "git.curoverse.com/arvados.git/sdk/go/streamer" "sort" "sync" "time" @@ -48,7 +48,7 @@ func (c *BlockCache) Sweep() { // Get returns data from the cache, first retrieving it from Keep if // necessary. -func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { +func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) { c.setupOnce.Do(c.setup) cacheKey := locator[:32] c.mtx.Lock() @@ -60,13 +60,12 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { } c.cache[cacheKey] = b go func() { - rdr, _, _, err := kc.Get(locator) - var data []byte + rdr, bufsize, _, err := kc.Get(locator) + c.mtx.Lock() if err == nil { - data, err = ioutil.ReadAll(rdr) + b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr) } - c.mtx.Lock() - b.data, b.err = data, err + b.err = err c.mtx.Unlock() close(b.fetched) go c.Sweep() @@ -81,7 +80,8 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { c.mtx.Lock() b.lastUse = time.Now() c.mtx.Unlock() - return b.data, b.err + + return b.data.MakeStreamReader(), b.err } func (c *BlockCache) setup() { @@ -97,7 +97,7 @@ func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) } func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } type cacheBlock struct { - data []byte + data *streamer.AsyncStream err error fetched chan struct{} lastUse time.Time diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 61fabefacd..a87041c655 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -3,10 +3,10 @@ package keepclient import ( "errors" "fmt" + "git.curoverse.com/arvados.git/sdk/go/manifest" + "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "os" - - "git.curoverse.com/arvados.git/sdk/go/manifest" ) // A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a @@ -67,7 +67,7 @@ type file struct { // current/latest segment accessed -- might or might not match pos seg *manifest.FileSegment segStart int64 // position of segment relative to file - segData []byte + segData *streamer.StreamReader segNext []*manifest.FileSegment readaheadDone bool } @@ -76,6 +76,9 @@ type file struct { func (f *file) Close() error { f.kc = nil f.segments = nil + if f.segData != nil { + f.segData.Close() + } f.segData = nil return nil } @@ -88,6 +91,9 @@ func (f *file) Read(buf []byte) (int, error) { // that does. f.seg = nil f.segStart = 0 + if f.segData != nil { + f.segData.Close() + } f.segData = nil f.segNext = f.segments for len(f.segNext) > 0 { @@ -110,17 +116,17 @@ func (f *file) Read(buf []byte) (int, error) { if err != nil { return 0, err } - if len(data) < f.seg.Offset+f.seg.Len { - return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator) + if int(data.Len()) < f.seg.Offset+f.seg.Len { + return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, data.Len(), f.seg.Locator) } - f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len] + f.segData = data } // dataOff and dataLen denote a portion of f.segData // corresponding to a portion of the file at f.offset. dataOff := int(f.offset - f.segStart) - dataLen := f.seg.Len - dataOff + dataLen := f.seg.Len - (f.seg.Offset + dataOff) - if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 { + if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && uint64(dataOff+dataLen) > f.segData.Len()/16 { // If we have already read more than just the first // few bytes of this file, and we have already // consumed a noticeable portion of this segment, and @@ -136,9 +142,10 @@ func (f *file) Read(buf []byte) (int, error) { if n > dataLen { n = dataLen } - copy(buf[:n], f.segData[dataOff:dataOff+n]) - f.offset += int64(n) - return n, nil + f.segData.Seek(int64(f.seg.Offset+dataOff), io.SeekStart) + count, err := f.segData.Read(buf[:n]) + f.offset += int64(count) + return count, err } // Seek implements io.Seeker. diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go index a46ca4cc55..c8d012f765 100644 --- a/sdk/go/streamer/streamer.go +++ b/sdk/go/streamer/streamer.go @@ -37,6 +37,7 @@ package streamer import ( "errors" + "fmt" "io" ) @@ -152,3 +153,29 @@ func (this *AsyncStream) Close() error { close(this.wait_zero_readers) return nil } + +func (this *StreamReader) Seek(offset int64, whence int) (int64, error) { + var want int64 + switch whence { + case io.SeekStart: + want = offset + case io.SeekCurrent: + want = int64(this.offset) + offset + case io.SeekEnd: + want = int64(this.Len()) + offset + default: + return int64(this.offset), fmt.Errorf("invalid whence %d", whence) + } + if want < 0 { + return int64(this.offset), fmt.Errorf("attempted seek to %d", want) + } + if want > int64(this.Len()) { + want = int64(this.Len()) + } + this.offset = int(want) + return want, nil +} + +func (this *StreamReader) Len() uint64 { + return uint64(len(this.stream.buffer)) +} -- 2.30.2