Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / keepclient / block_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "io/ioutil"
9         "sort"
10         "sync"
11         "time"
12 )
13
14 var DefaultBlockCache = &BlockCache{}
15
16 type BlockCache struct {
17         // Maximum number of blocks to keep in the cache. If 0, a
18         // default size (currently 4) is used instead.
19         MaxBlocks int
20
21         cache     map[string]*cacheBlock
22         mtx       sync.Mutex
23         setupOnce sync.Once
24 }
25
26 const defaultMaxBlocks = 4
27
28 // Sweep deletes the least recently used blocks from the cache until
29 // there are no more than MaxBlocks left.
30 func (c *BlockCache) Sweep() {
31         max := c.MaxBlocks
32         if max < defaultMaxBlocks {
33                 max = defaultMaxBlocks
34         }
35         c.mtx.Lock()
36         defer c.mtx.Unlock()
37         if len(c.cache) <= max {
38                 return
39         }
40         lru := make([]time.Time, 0, len(c.cache))
41         for _, b := range c.cache {
42                 lru = append(lru, b.lastUse)
43         }
44         sort.Sort(sort.Reverse(timeSlice(lru)))
45         threshold := lru[max]
46         for loc, b := range c.cache {
47                 if !b.lastUse.After(threshold) {
48                         delete(c.cache, loc)
49                 }
50         }
51 }
52
53 // Get returns data from the cache, first retrieving it from Keep if
54 // necessary.
55 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
56         c.setupOnce.Do(c.setup)
57         cacheKey := locator[:32]
58         c.mtx.Lock()
59         b, ok := c.cache[cacheKey]
60         if !ok || b.err != nil {
61                 b = &cacheBlock{
62                         fetched: make(chan struct{}),
63                         lastUse: time.Now(),
64                 }
65                 c.cache[cacheKey] = b
66                 go func() {
67                         rdr, _, _, err := kc.Get(locator)
68                         var data []byte
69                         if err == nil {
70                                 data, err = ioutil.ReadAll(rdr)
71                                 err2 := rdr.Close()
72                                 if err == nil {
73                                         err = err2
74                                 }
75                         }
76                         c.mtx.Lock()
77                         b.data, b.err = data, err
78                         c.mtx.Unlock()
79                         close(b.fetched)
80                         go c.Sweep()
81                 }()
82         }
83         c.mtx.Unlock()
84
85         // Wait (with mtx unlocked) for the fetch goroutine to finish,
86         // in case it hasn't already.
87         <-b.fetched
88
89         c.mtx.Lock()
90         b.lastUse = time.Now()
91         c.mtx.Unlock()
92         return b.data, b.err
93 }
94
95 func (c *BlockCache) setup() {
96         c.cache = make(map[string]*cacheBlock)
97 }
98
99 type timeSlice []time.Time
100
101 func (ts timeSlice) Len() int { return len(ts) }
102
103 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
104
105 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
106
107 type cacheBlock struct {
108         data    []byte
109         err     error
110         fetched chan struct{}
111         lastUse time.Time
112 }