12483: Simplify extent packing, reduce type casting.
[arvados.git] / sdk / go / keepclient / block_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "io"
9         "sort"
10         "sync"
11         "time"
12 )
13
14 var DefaultBlockCache = &BlockCache{}
15
16 type BlockCache struct {
17         // Maximum number of blocks to keep in the cache. If 0, a
18         // default size (currently 4) is used instead.
19         MaxBlocks int
20
21         cache map[string]*cacheBlock
22         mtx   sync.Mutex
23 }
24
25 const defaultMaxBlocks = 4
26
27 // Sweep deletes the least recently used blocks from the cache until
28 // there are no more than MaxBlocks left.
29 func (c *BlockCache) Sweep() {
30         max := c.MaxBlocks
31         if max == 0 {
32                 max = defaultMaxBlocks
33         }
34         c.mtx.Lock()
35         defer c.mtx.Unlock()
36         if len(c.cache) <= max {
37                 return
38         }
39         lru := make([]time.Time, 0, len(c.cache))
40         for _, b := range c.cache {
41                 lru = append(lru, b.lastUse)
42         }
43         sort.Sort(sort.Reverse(timeSlice(lru)))
44         threshold := lru[max]
45         for loc, b := range c.cache {
46                 if !b.lastUse.After(threshold) {
47                         delete(c.cache, loc)
48                 }
49         }
50 }
51
52 // ReadAt returns data from the cache, first retrieving it from Keep if
53 // necessary.
54 func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
55         buf, err := c.Get(kc, locator)
56         if err != nil {
57                 return 0, err
58         }
59         if off > len(buf) {
60                 return 0, io.ErrUnexpectedEOF
61         }
62         return copy(p, buf[off:]), nil
63 }
64
65 // Get returns data from the cache, first retrieving it from Keep if
66 // necessary.
67 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
68         cacheKey := locator[:32]
69         c.mtx.Lock()
70         if c.cache == nil {
71                 c.cache = make(map[string]*cacheBlock)
72         }
73         b, ok := c.cache[cacheKey]
74         if !ok || b.err != nil {
75                 b = &cacheBlock{
76                         fetched: make(chan struct{}),
77                         lastUse: time.Now(),
78                 }
79                 c.cache[cacheKey] = b
80                 go func() {
81                         rdr, size, _, err := kc.Get(locator)
82                         var data []byte
83                         if err == nil {
84                                 data = make([]byte, size, BLOCKSIZE)
85                                 _, err = io.ReadFull(rdr, data)
86                                 err2 := rdr.Close()
87                                 if err == nil {
88                                         err = err2
89                                 }
90                         }
91                         c.mtx.Lock()
92                         b.data, b.err = data, err
93                         c.mtx.Unlock()
94                         close(b.fetched)
95                         go c.Sweep()
96                 }()
97         }
98         c.mtx.Unlock()
99
100         // Wait (with mtx unlocked) for the fetch goroutine to finish,
101         // in case it hasn't already.
102         <-b.fetched
103
104         c.mtx.Lock()
105         b.lastUse = time.Now()
106         c.mtx.Unlock()
107         return b.data, b.err
108 }
109
110 func (c *BlockCache) Clear() {
111         c.mtx.Lock()
112         c.cache = nil
113         c.mtx.Unlock()
114 }
115
116 type timeSlice []time.Time
117
118 func (ts timeSlice) Len() int { return len(ts) }
119
120 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
121
122 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
123
124 type cacheBlock struct {
125         data    []byte
126         err     error
127         fetched chan struct{}
128         lastUse time.Time
129 }