Merge branch '11100-cr-output-ttl'
[arvados.git] / sdk / go / keepclient / block_cache.go
1 package keepclient
2
3 import (
4         "io/ioutil"
5         "sort"
6         "sync"
7         "time"
8 )
9
10 var DefaultBlockCache = &BlockCache{}
11
12 type BlockCache struct {
13         // Maximum number of blocks to keep in the cache. If 0, a
14         // default size (currently 4) is used instead.
15         MaxBlocks int
16
17         cache     map[string]*cacheBlock
18         mtx       sync.Mutex
19         setupOnce sync.Once
20 }
21
22 const defaultMaxBlocks = 4
23
24 // Sweep deletes the least recently used blocks from the cache until
25 // there are no more than MaxBlocks left.
26 func (c *BlockCache) Sweep() {
27         max := c.MaxBlocks
28         if max < defaultMaxBlocks {
29                 max = defaultMaxBlocks
30         }
31         c.mtx.Lock()
32         defer c.mtx.Unlock()
33         if len(c.cache) <= max {
34                 return
35         }
36         lru := make([]time.Time, 0, len(c.cache))
37         for _, b := range c.cache {
38                 lru = append(lru, b.lastUse)
39         }
40         sort.Sort(sort.Reverse(timeSlice(lru)))
41         threshold := lru[max]
42         for loc, b := range c.cache {
43                 if !b.lastUse.After(threshold) {
44                         delete(c.cache, loc)
45                 }
46         }
47 }
48
49 // Get returns data from the cache, first retrieving it from Keep if
50 // necessary.
51 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
52         c.setupOnce.Do(c.setup)
53         cacheKey := locator[:32]
54         c.mtx.Lock()
55         b, ok := c.cache[cacheKey]
56         if !ok || b.err != nil {
57                 b = &cacheBlock{
58                         fetched: make(chan struct{}),
59                         lastUse: time.Now(),
60                 }
61                 c.cache[cacheKey] = b
62                 go func() {
63                         rdr, _, _, err := kc.Get(locator)
64                         var data []byte
65                         if err == nil {
66                                 data, err = ioutil.ReadAll(rdr)
67                         }
68                         c.mtx.Lock()
69                         b.data, b.err = data, err
70                         c.mtx.Unlock()
71                         close(b.fetched)
72                         go c.Sweep()
73                 }()
74         }
75         c.mtx.Unlock()
76
77         // Wait (with mtx unlocked) for the fetch goroutine to finish,
78         // in case it hasn't already.
79         <-b.fetched
80
81         c.mtx.Lock()
82         b.lastUse = time.Now()
83         c.mtx.Unlock()
84         return b.data, b.err
85 }
86
87 func (c *BlockCache) setup() {
88         c.cache = make(map[string]*cacheBlock)
89 }
90
91 type timeSlice []time.Time
92
93 func (ts timeSlice) Len() int { return len(ts) }
94
95 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
96
97 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
98
99 type cacheBlock struct {
100         data    []byte
101         err     error
102         fetched chan struct{}
103         lastUse time.Time
104 }