12167: Merge branch 'master' into 12167-keep-request-id
[arvados.git] / sdk / go / keepclient / block_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "io"
9         "sort"
10         "sync"
11         "time"
12 )
13
14 var DefaultBlockCache = &BlockCache{}
15
16 type BlockCache struct {
17         // Maximum number of blocks to keep in the cache. If 0, a
18         // default size (currently 4) is used instead.
19         MaxBlocks int
20
21         cache map[string]*cacheBlock
22         mtx   sync.Mutex
23 }
24
25 const defaultMaxBlocks = 4
26
27 // Sweep deletes the least recently used blocks from the cache until
28 // there are no more than MaxBlocks left.
29 func (c *BlockCache) Sweep() {
30         max := c.MaxBlocks
31         if max == 0 {
32                 max = defaultMaxBlocks
33         }
34         c.mtx.Lock()
35         defer c.mtx.Unlock()
36         if len(c.cache) <= max {
37                 return
38         }
39         lru := make([]time.Time, 0, len(c.cache))
40         for _, b := range c.cache {
41                 lru = append(lru, b.lastUse)
42         }
43         sort.Sort(sort.Reverse(timeSlice(lru)))
44         threshold := lru[max]
45         for loc, b := range c.cache {
46                 if !b.lastUse.After(threshold) {
47                         delete(c.cache, loc)
48                 }
49         }
50 }
51
52 // Get returns data from the cache, first retrieving it from Keep if
53 // necessary.
54 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
55         cacheKey := locator[:32]
56         c.mtx.Lock()
57         if c.cache == nil {
58                 c.cache = make(map[string]*cacheBlock)
59         }
60         b, ok := c.cache[cacheKey]
61         if !ok || b.err != nil {
62                 b = &cacheBlock{
63                         fetched: make(chan struct{}),
64                         lastUse: time.Now(),
65                 }
66                 c.cache[cacheKey] = b
67                 go func() {
68                         rdr, size, _, err := kc.Get(locator)
69                         var data []byte
70                         if err == nil {
71                                 data = make([]byte, size, BLOCKSIZE)
72                                 _, err = io.ReadFull(rdr, data)
73                                 err2 := rdr.Close()
74                                 if err == nil {
75                                         err = err2
76                                 }
77                         }
78                         c.mtx.Lock()
79                         b.data, b.err = data, err
80                         c.mtx.Unlock()
81                         close(b.fetched)
82                         go c.Sweep()
83                 }()
84         }
85         c.mtx.Unlock()
86
87         // Wait (with mtx unlocked) for the fetch goroutine to finish,
88         // in case it hasn't already.
89         <-b.fetched
90
91         c.mtx.Lock()
92         b.lastUse = time.Now()
93         c.mtx.Unlock()
94         return b.data, b.err
95 }
96
97 func (c *BlockCache) Clear() {
98         c.mtx.Lock()
99         c.cache = nil
100         c.mtx.Unlock()
101 }
102
103 type timeSlice []time.Time
104
105 func (ts timeSlice) Len() int { return len(ts) }
106
107 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
108
109 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
110
111 type cacheBlock struct {
112         data    []byte
113         err     error
114         fetched chan struct{}
115         lastUse time.Time
116 }