Merge branch '10312-nodemanager-quotas' refs #10312
[arvados.git] / sdk / go / keepclient / block_cache.go
1 package keepclient
2
3 import (
4         "io/ioutil"
5         "sort"
6         "sync"
7         "time"
8 )
9
10 var DefaultBlockCache = &BlockCache{}
11
12 type BlockCache struct {
13         // Maximum number of blocks to keep in the cache. If 0, a
14         // default size (currently 4) is used instead.
15         MaxBlocks int
16
17         cache     map[string]*cacheBlock
18         mtx       sync.Mutex
19         setupOnce sync.Once
20 }
21
22 const defaultMaxBlocks = 4
23
24 // Sweep deletes the least recently used blocks from the cache until
25 // there are no more than MaxBlocks left.
26 func (c *BlockCache) Sweep() {
27         max := c.MaxBlocks
28         if max < defaultMaxBlocks {
29                 max = defaultMaxBlocks
30         }
31         c.mtx.Lock()
32         defer c.mtx.Unlock()
33         if len(c.cache) <= max {
34                 return
35         }
36         lru := make([]time.Time, 0, len(c.cache))
37         for _, b := range c.cache {
38                 lru = append(lru, b.lastUse)
39         }
40         sort.Sort(sort.Reverse(timeSlice(lru)))
41         threshold := lru[max]
42         for loc, b := range c.cache {
43                 if !b.lastUse.After(threshold) {
44                         delete(c.cache, loc)
45                 }
46         }
47 }
48
49 // Get returns data from the cache, first retrieving it from Keep if
50 // necessary.
51 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
52         c.setupOnce.Do(c.setup)
53         cacheKey := locator[:32]
54         c.mtx.Lock()
55         b, ok := c.cache[cacheKey]
56         if !ok || b.err != nil {
57                 b = &cacheBlock{
58                         fetched: make(chan struct{}),
59                         lastUse: time.Now(),
60                 }
61                 c.cache[cacheKey] = b
62                 go func() {
63                         rdr, _, _, err := kc.Get(locator)
64                         var data []byte
65                         if err == nil {
66                                 data, err = ioutil.ReadAll(rdr)
67                                 err2 := rdr.Close()
68                                 if err == nil {
69                                         err = err2
70                                 }
71                         }
72                         c.mtx.Lock()
73                         b.data, b.err = data, err
74                         c.mtx.Unlock()
75                         close(b.fetched)
76                         go c.Sweep()
77                 }()
78         }
79         c.mtx.Unlock()
80
81         // Wait (with mtx unlocked) for the fetch goroutine to finish,
82         // in case it hasn't already.
83         <-b.fetched
84
85         c.mtx.Lock()
86         b.lastUse = time.Now()
87         c.mtx.Unlock()
88         return b.data, b.err
89 }
90
91 func (c *BlockCache) setup() {
92         c.cache = make(map[string]*cacheBlock)
93 }
94
95 type timeSlice []time.Time
96
97 func (ts timeSlice) Len() int { return len(ts) }
98
99 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
100
101 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
102
103 type cacheBlock struct {
104         data    []byte
105         err     error
106         fetched chan struct{}
107         lastUse time.Time
108 }