18f7de99e5f90855c13526c92eebc6c52743fa34
[arvados.git] / sdk / go / keepclient / block_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "io"
10         "sort"
11         "sync"
12         "time"
13 )
14
15 var DefaultBlockCache = &BlockCache{}
16
17 type BlockCache struct {
18         // Maximum number of blocks to keep in the cache. If 0, a
19         // default size (currently 4) is used instead.
20         MaxBlocks int
21
22         cache     map[string]*cacheBlock
23         mtx       sync.Mutex
24         setupOnce sync.Once
25 }
26
27 const defaultMaxBlocks = 4
28
29 // Sweep deletes the least recently used blocks from the cache until
30 // there are no more than MaxBlocks left.
31 func (c *BlockCache) Sweep() {
32         max := c.MaxBlocks
33         if max < defaultMaxBlocks {
34                 max = defaultMaxBlocks
35         }
36         c.mtx.Lock()
37         defer c.mtx.Unlock()
38         if len(c.cache) <= max {
39                 return
40         }
41         lru := make([]time.Time, 0, len(c.cache))
42         for _, b := range c.cache {
43                 lru = append(lru, b.lastUse)
44         }
45         sort.Sort(sort.Reverse(timeSlice(lru)))
46         threshold := lru[max]
47         for loc, b := range c.cache {
48                 if !b.lastUse.After(threshold) {
49                         delete(c.cache, loc)
50                 }
51         }
52 }
53
54 // Get returns data from the cache, first retrieving it from Keep if
55 // necessary.
56 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
57         c.setupOnce.Do(c.setup)
58         cacheKey := locator[:32]
59         c.mtx.Lock()
60         b, ok := c.cache[cacheKey]
61         if !ok || b.err != nil {
62                 b = &cacheBlock{
63                         fetched: make(chan struct{}),
64                         lastUse: time.Now(),
65                 }
66                 c.cache[cacheKey] = b
67                 go func() {
68                         rdr, _, _, err := kc.Get(locator)
69                         data := bytes.NewBuffer(make([]byte, 0, BLOCKSIZE))
70                         if err == nil {
71                                 _, err = io.Copy(data, rdr)
72                                 err2 := rdr.Close()
73                                 if err == nil {
74                                         err = err2
75                                 }
76                         }
77                         c.mtx.Lock()
78                         b.data, b.err = data.Bytes(), err
79                         c.mtx.Unlock()
80                         close(b.fetched)
81                         go c.Sweep()
82                 }()
83         }
84         c.mtx.Unlock()
85
86         // Wait (with mtx unlocked) for the fetch goroutine to finish,
87         // in case it hasn't already.
88         <-b.fetched
89
90         c.mtx.Lock()
91         b.lastUse = time.Now()
92         c.mtx.Unlock()
93         return b.data, b.err
94 }
95
96 func (c *BlockCache) setup() {
97         c.cache = make(map[string]*cacheBlock)
98 }
99
100 func (c *BlockCache) Clear() {
101         c.mtx.Lock()
102         c.setup()
103         c.mtx.Unlock()
104 }
105
106 type timeSlice []time.Time
107
108 func (ts timeSlice) Len() int { return len(ts) }
109
110 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
111
112 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
113
114 type cacheBlock struct {
115         data    []byte
116         err     error
117         fetched chan struct{}
118         lastUse time.Time
119 }