Merge branch '19889-live-log-webdav'
[arvados.git] / sdk / go / keepclient / block_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "fmt"
9         "io"
10         "sort"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 var DefaultBlockCache = &BlockCache{}
18
19 type BlockCache struct {
20         // Maximum number of blocks to keep in the cache. If 0, a
21         // default size (currently 4) is used instead.
22         MaxBlocks int
23
24         cache map[string]*cacheBlock
25         mtx   sync.Mutex
26 }
27
28 const defaultMaxBlocks = 4
29
30 // Sweep deletes the least recently used blocks from the cache until
31 // there are no more than MaxBlocks left.
32 func (c *BlockCache) Sweep() {
33         max := c.MaxBlocks
34         if max == 0 {
35                 max = defaultMaxBlocks
36         }
37         c.mtx.Lock()
38         defer c.mtx.Unlock()
39         if len(c.cache) <= max {
40                 return
41         }
42         lru := make([]time.Time, 0, len(c.cache))
43         for _, b := range c.cache {
44                 lru = append(lru, b.lastUse)
45         }
46         sort.Sort(sort.Reverse(timeSlice(lru)))
47         threshold := lru[max]
48         for loc, b := range c.cache {
49                 if !b.lastUse.After(threshold) {
50                         delete(c.cache, loc)
51                 }
52         }
53 }
54
55 // ReadAt returns data from the cache, first retrieving it from Keep if
56 // necessary.
57 func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
58         buf, err := c.Get(kc, locator)
59         if err != nil {
60                 return 0, err
61         }
62         if off > len(buf) {
63                 return 0, io.ErrUnexpectedEOF
64         }
65         return copy(p, buf[off:]), nil
66 }
67
68 // Get returns data from the cache, first retrieving it from Keep if
69 // necessary.
70 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
71         cacheKey := locator[:32]
72         bufsize := BLOCKSIZE
73         if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
74                 datasize, err := strconv.ParseInt(parts[1], 10, 32)
75                 if err == nil && datasize >= 0 {
76                         bufsize = int(datasize)
77                 }
78         }
79         c.mtx.Lock()
80         if c.cache == nil {
81                 c.cache = make(map[string]*cacheBlock)
82         }
83         b, ok := c.cache[cacheKey]
84         if !ok || b.err != nil {
85                 b = &cacheBlock{
86                         fetched: make(chan struct{}),
87                         lastUse: time.Now(),
88                 }
89                 c.cache[cacheKey] = b
90                 go func() {
91                         rdr, size, _, err := kc.Get(locator)
92                         var data []byte
93                         if err == nil {
94                                 data = make([]byte, size, bufsize)
95                                 _, err = io.ReadFull(rdr, data)
96                                 err2 := rdr.Close()
97                                 if err == nil && err2 != nil {
98                                         err = fmt.Errorf("close(): %w", err2)
99                                 }
100                                 if err != nil {
101                                         err = fmt.Errorf("Get %s: %w", locator, err)
102                                 }
103                         }
104                         c.mtx.Lock()
105                         b.data, b.err = data, err
106                         c.mtx.Unlock()
107                         close(b.fetched)
108                         go c.Sweep()
109                 }()
110         }
111         c.mtx.Unlock()
112
113         // Wait (with mtx unlocked) for the fetch goroutine to finish,
114         // in case it hasn't already.
115         <-b.fetched
116
117         c.mtx.Lock()
118         b.lastUse = time.Now()
119         c.mtx.Unlock()
120         return b.data, b.err
121 }
122
123 func (c *BlockCache) Clear() {
124         c.mtx.Lock()
125         c.cache = nil
126         c.mtx.Unlock()
127 }
128
129 type timeSlice []time.Time
130
131 func (ts timeSlice) Len() int { return len(ts) }
132
133 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
134
135 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
136
137 type cacheBlock struct {
138         data    []byte
139         err     error
140         fetched chan struct{}
141         lastUse time.Time
142 }