X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/827879be023e90d58eb681b3c930154739a0b27f..e20590d485505f58f7745d74a311ca539c9be940:/sdk/go/keepclient/block_cache.go diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go index 7d03b68b9c..89eecc6e27 100644 --- a/sdk/go/keepclient/block_cache.go +++ b/sdk/go/keepclient/block_cache.go @@ -1,8 +1,15 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( - "io/ioutil" + "fmt" + "io" "sort" + "strconv" + "strings" "sync" "time" ) @@ -14,9 +21,8 @@ type BlockCache struct { // default size (currently 4) is used instead. MaxBlocks int - cache map[string]*cacheBlock - mtx sync.Mutex - setupOnce sync.Once + cache map[string]*cacheBlock + mtx sync.Mutex } const defaultMaxBlocks = 4 @@ -25,7 +31,7 @@ const defaultMaxBlocks = 4 // there are no more than MaxBlocks left. func (c *BlockCache) Sweep() { max := c.MaxBlocks - if max < defaultMaxBlocks { + if max == 0 { max = defaultMaxBlocks } c.mtx.Lock() @@ -46,12 +52,34 @@ func (c *BlockCache) Sweep() { } } +// ReadAt returns data from the cache, first retrieving it from Keep if +// necessary. +func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) { + buf, err := c.Get(kc, locator) + if err != nil { + return 0, err + } + if off > len(buf) { + return 0, io.ErrUnexpectedEOF + } + return copy(p, buf[off:]), nil +} + // Get returns data from the cache, first retrieving it from Keep if // necessary. func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { - c.setupOnce.Do(c.setup) cacheKey := locator[:32] + bufsize := BLOCKSIZE + if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 { + datasize, err := strconv.ParseInt(parts[1], 10, 32) + if err == nil && datasize >= 0 { + bufsize = int(datasize) + } + } c.mtx.Lock() + if c.cache == nil { + c.cache = make(map[string]*cacheBlock) + } b, ok := c.cache[cacheKey] if !ok || b.err != nil { b = &cacheBlock{ @@ -60,10 +88,18 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { } c.cache[cacheKey] = b go func() { - rdr, _, _, err := kc.Get(locator) + rdr, size, _, err := kc.Get(locator) var data []byte if err == nil { - data, err = ioutil.ReadAll(rdr) + data = make([]byte, size, bufsize) + _, err = io.ReadFull(rdr, data) + err2 := rdr.Close() + if err == nil && err2 != nil { + err = fmt.Errorf("close(): %w", err2) + } + if err != nil { + err = fmt.Errorf("Get %s: %w", locator, err) + } } c.mtx.Lock() b.data, b.err = data, err @@ -84,8 +120,10 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { return b.data, b.err } -func (c *BlockCache) setup() { - c.cache = make(map[string]*cacheBlock) +func (c *BlockCache) Clear() { + c.mtx.Lock() + c.cache = nil + c.mtx.Unlock() } type timeSlice []time.Time