X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6ee6e654bc873db10037c735a63697d295ec40cb..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/sdk/go/keepclient/block_cache.go diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go index 18f7de99e5..bac4a24fd5 100644 --- a/sdk/go/keepclient/block_cache.go +++ b/sdk/go/keepclient/block_cache.go @@ -5,9 +5,10 @@ package keepclient import ( - "bytes" "io" "sort" + "strconv" + "strings" "sync" "time" ) @@ -19,9 +20,8 @@ type BlockCache struct { // default size (currently 4) is used instead. MaxBlocks int - cache map[string]*cacheBlock - mtx sync.Mutex - setupOnce sync.Once + cache map[string]*cacheBlock + mtx sync.Mutex } const defaultMaxBlocks = 4 @@ -30,7 +30,7 @@ const defaultMaxBlocks = 4 // there are no more than MaxBlocks left. func (c *BlockCache) Sweep() { max := c.MaxBlocks - if max < defaultMaxBlocks { + if max == 0 { max = defaultMaxBlocks } c.mtx.Lock() @@ -51,12 +51,34 @@ func (c *BlockCache) Sweep() { } } +// ReadAt returns data from the cache, first retrieving it from Keep if +// necessary. +func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) { + buf, err := c.Get(kc, locator) + if err != nil { + return 0, err + } + if off > len(buf) { + return 0, io.ErrUnexpectedEOF + } + return copy(p, buf[off:]), nil +} + // Get returns data from the cache, first retrieving it from Keep if // necessary. func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { - c.setupOnce.Do(c.setup) cacheKey := locator[:32] + bufsize := BLOCKSIZE + if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 { + datasize, err := strconv.ParseInt(parts[1], 10, 32) + if err == nil && datasize >= 0 { + bufsize = int(datasize) + } + } c.mtx.Lock() + if c.cache == nil { + c.cache = make(map[string]*cacheBlock) + } b, ok := c.cache[cacheKey] if !ok || b.err != nil { b = &cacheBlock{ @@ -65,17 +87,18 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { } c.cache[cacheKey] = b go func() { - rdr, _, _, err := kc.Get(locator) - data := bytes.NewBuffer(make([]byte, 0, BLOCKSIZE)) + rdr, size, _, err := kc.Get(locator) + var data []byte if err == nil { - _, err = io.Copy(data, rdr) + data = make([]byte, size, bufsize) + _, err = io.ReadFull(rdr, data) err2 := rdr.Close() if err == nil { err = err2 } } c.mtx.Lock() - b.data, b.err = data.Bytes(), err + b.data, b.err = data, err c.mtx.Unlock() close(b.fetched) go c.Sweep() @@ -93,13 +116,9 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { return b.data, b.err } -func (c *BlockCache) setup() { - c.cache = make(map[string]*cacheBlock) -} - func (c *BlockCache) Clear() { c.mtx.Lock() - c.setup() + c.cache = nil c.mtx.Unlock() }