9865: Executor exception improvement
[arvados.git] / sdk / go / keepclient / block_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "io"
9         "sort"
10         "strconv"
11         "strings"
12         "sync"
13         "time"
14 )
15
16 var DefaultBlockCache = &BlockCache{}
17
18 type BlockCache struct {
19         // Maximum number of blocks to keep in the cache. If 0, a
20         // default size (currently 4) is used instead.
21         MaxBlocks int
22
23         cache map[string]*cacheBlock
24         mtx   sync.Mutex
25 }
26
27 const defaultMaxBlocks = 4
28
29 // Sweep deletes the least recently used blocks from the cache until
30 // there are no more than MaxBlocks left.
31 func (c *BlockCache) Sweep() {
32         max := c.MaxBlocks
33         if max == 0 {
34                 max = defaultMaxBlocks
35         }
36         c.mtx.Lock()
37         defer c.mtx.Unlock()
38         if len(c.cache) <= max {
39                 return
40         }
41         lru := make([]time.Time, 0, len(c.cache))
42         for _, b := range c.cache {
43                 lru = append(lru, b.lastUse)
44         }
45         sort.Sort(sort.Reverse(timeSlice(lru)))
46         threshold := lru[max]
47         for loc, b := range c.cache {
48                 if !b.lastUse.After(threshold) {
49                         delete(c.cache, loc)
50                 }
51         }
52 }
53
54 // ReadAt returns data from the cache, first retrieving it from Keep if
55 // necessary.
56 func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
57         buf, err := c.Get(kc, locator)
58         if err != nil {
59                 return 0, err
60         }
61         if off > len(buf) {
62                 return 0, io.ErrUnexpectedEOF
63         }
64         return copy(p, buf[off:]), nil
65 }
66
67 // Get returns data from the cache, first retrieving it from Keep if
68 // necessary.
69 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
70         cacheKey := locator[:32]
71         bufsize := BLOCKSIZE
72         if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
73                 datasize, err := strconv.ParseInt(parts[1], 10, 32)
74                 if err == nil && datasize >= 0 {
75                         bufsize = int(datasize)
76                 }
77         }
78         c.mtx.Lock()
79         if c.cache == nil {
80                 c.cache = make(map[string]*cacheBlock)
81         }
82         b, ok := c.cache[cacheKey]
83         if !ok || b.err != nil {
84                 b = &cacheBlock{
85                         fetched: make(chan struct{}),
86                         lastUse: time.Now(),
87                 }
88                 c.cache[cacheKey] = b
89                 go func() {
90                         rdr, size, _, err := kc.Get(locator)
91                         var data []byte
92                         if err == nil {
93                                 data = make([]byte, size, bufsize)
94                                 _, err = io.ReadFull(rdr, data)
95                                 err2 := rdr.Close()
96                                 if err == nil {
97                                         err = err2
98                                 }
99                         }
100                         c.mtx.Lock()
101                         b.data, b.err = data, err
102                         c.mtx.Unlock()
103                         close(b.fetched)
104                         go c.Sweep()
105                 }()
106         }
107         c.mtx.Unlock()
108
109         // Wait (with mtx unlocked) for the fetch goroutine to finish,
110         // in case it hasn't already.
111         <-b.fetched
112
113         c.mtx.Lock()
114         b.lastUse = time.Now()
115         c.mtx.Unlock()
116         return b.data, b.err
117 }
118
119 func (c *BlockCache) Clear() {
120         c.mtx.Lock()
121         c.cache = nil
122         c.mtx.Unlock()
123 }
124
125 type timeSlice []time.Time
126
127 func (ts timeSlice) Len() int { return len(ts) }
128
129 func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
130
131 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
132
133 type cacheBlock struct {
134         data    []byte
135         err     error
136         fetched chan struct{}
137         lastUse time.Time
138 }