11809: Rename FooEntries -> MaxFooEntries in cache config.
[arvados.git] / services / keep-web / cache.go
1 package main
2
3 import (
4         "fmt"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "github.com/hashicorp/golang-lru"
12 )
13
14 type cache struct {
15         TTL                  arvados.Duration
16         MaxCollectionEntries int
17         MaxCollectionBytes   int64
18         MaxPermissionEntries int
19         MaxUUIDEntries       int
20
21         stats       cacheStats
22         pdhs        *lru.TwoQueueCache
23         collections *lru.TwoQueueCache
24         permissions *lru.TwoQueueCache
25         setupOnce   sync.Once
26 }
27
28 type cacheStats struct {
29         Requests          uint64 `json:"Cache.Requests"`
30         CollectionBytes   uint64 `json:"Cache.CollectionBytes"`
31         CollectionEntries int    `json:"Cache.CollectionEntries"`
32         CollectionHits    uint64 `json:"Cache.CollectionHits"`
33         PDHHits           uint64 `json:"Cache.UUIDHits"`
34         PermissionHits    uint64 `json:"Cache.PermissionHits"`
35         APICalls          uint64 `json:"Cache.APICalls"`
36 }
37
38 type cachedPDH struct {
39         expire time.Time
40         pdh    string
41 }
42
43 type cachedCollection struct {
44         expire     time.Time
45         collection map[string]interface{}
46 }
47
48 type cachedPermission struct {
49         expire time.Time
50 }
51
52 func (c *cache) setup() {
53         var err error
54         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
55         if err != nil {
56                 panic(err)
57         }
58         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
59         if err != nil {
60                 panic(err)
61         }
62         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
63         if err != nil {
64                 panic(err)
65         }
66 }
67
68 var selectPDH = map[string]interface{}{
69         "select": []string{"portable_data_hash"},
70 }
71
72 func (c *cache) Stats() cacheStats {
73         c.setupOnce.Do(c.setup)
74         return cacheStats{
75                 Requests:          atomic.LoadUint64(&c.stats.Requests),
76                 CollectionBytes:   c.collectionBytes(),
77                 CollectionEntries: c.collections.Len(),
78                 CollectionHits:    atomic.LoadUint64(&c.stats.CollectionHits),
79                 PDHHits:           atomic.LoadUint64(&c.stats.PDHHits),
80                 PermissionHits:    atomic.LoadUint64(&c.stats.PermissionHits),
81                 APICalls:          atomic.LoadUint64(&c.stats.APICalls),
82         }
83 }
84
85 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
86         c.setupOnce.Do(c.setup)
87
88         atomic.AddUint64(&c.stats.Requests, 1)
89
90         permOK := false
91         permKey := arv.ApiToken + "\000" + targetID
92         if ent, cached := c.permissions.Get(permKey); cached {
93                 ent := ent.(*cachedPermission)
94                 if ent.expire.Before(time.Now()) {
95                         c.permissions.Remove(permKey)
96                 } else {
97                         permOK = true
98                         atomic.AddUint64(&c.stats.PermissionHits, 1)
99                 }
100         }
101
102         var pdh string
103         if arvadosclient.PDHMatch(targetID) {
104                 pdh = targetID
105         } else if ent, cached := c.pdhs.Get(targetID); cached {
106                 ent := ent.(*cachedPDH)
107                 if ent.expire.Before(time.Now()) {
108                         c.pdhs.Remove(targetID)
109                 } else {
110                         pdh = ent.pdh
111                         atomic.AddUint64(&c.stats.PDHHits, 1)
112                 }
113         }
114
115         collection := c.lookupCollection(pdh)
116
117         if collection != nil && permOK && !forceReload {
118                 return collection, nil
119         }
120
121         if collection != nil {
122                 // Ask API for current PDH for this targetID. Most
123                 // likely, the cached PDH is still correct; if so,
124                 // _and_ the current token has permission, we can
125                 // use our cached manifest.
126                 atomic.AddUint64(&c.stats.APICalls, 1)
127                 var current map[string]interface{}
128                 err := arv.Get("collections", targetID, selectPDH, &current)
129                 if err != nil {
130                         return nil, err
131                 }
132                 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
133                         return nil, fmt.Errorf("API response for %q had no PDH", targetID)
134                 } else if checkPDH == pdh {
135                         exp := time.Now().Add(time.Duration(c.TTL))
136                         c.permissions.Add(permKey, &cachedPermission{
137                                 expire: exp,
138                         })
139                         if pdh != targetID {
140                                 c.pdhs.Add(targetID, &cachedPDH{
141                                         expire: exp,
142                                         pdh:    pdh,
143                                 })
144                         }
145                         return collection, err
146                 } else {
147                         // PDH changed, but now we know we have
148                         // permission -- and maybe we already have the
149                         // new PDH in the cache.
150                         if coll := c.lookupCollection(checkPDH); coll != nil {
151                                 return coll, nil
152                         }
153                 }
154         }
155
156         // Collection manifest is not cached.
157         atomic.AddUint64(&c.stats.APICalls, 1)
158         err := arv.Get("collections", targetID, nil, &collection)
159         if err != nil {
160                 return nil, err
161         }
162         pdh, ok := collection["portable_data_hash"].(string)
163         if !ok {
164                 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
165         }
166         exp := time.Now().Add(time.Duration(c.TTL))
167         c.permissions.Add(permKey, &cachedPermission{
168                 expire: exp,
169         })
170         c.pdhs.Add(targetID, &cachedPDH{
171                 expire: exp,
172                 pdh:    pdh,
173         })
174         c.collections.Add(pdh, &cachedCollection{
175                 expire:     exp,
176                 collection: collection,
177         })
178         if int64(len(collection["manifest_text"].(string))) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
179                 c.pruneCollections()
180         }
181         return collection, nil
182 }
183
184 // pruneCollections checks the total bytes occupied by manifest_text
185 // in the collection cache and removes old entries as needed to bring
186 // the total size down to CollectionBytes. It also deletes all expired
187 // entries.
188 //
189 // pruneCollections does not aim to be perfectly correct when there is
190 // concurrent cache activity.
191 func (c *cache) pruneCollections() {
192         var size int64
193         now := time.Now()
194         keys := c.collections.Keys()
195         entsize := make([]int, len(keys))
196         expired := make([]bool, len(keys))
197         for i, k := range keys {
198                 v, ok := c.collections.Peek(k)
199                 if !ok {
200                         continue
201                 }
202                 ent := v.(*cachedCollection)
203                 n := len(ent.collection["manifest_text"].(string))
204                 size += int64(n)
205                 entsize[i] = n
206                 expired[i] = ent.expire.Before(now)
207         }
208         for i, k := range keys {
209                 if expired[i] {
210                         c.collections.Remove(k)
211                         size -= int64(entsize[i])
212                 }
213         }
214         for i, k := range keys {
215                 if size <= c.MaxCollectionBytes {
216                         break
217                 }
218                 if expired[i] {
219                         // already removed this entry in the previous loop
220                         continue
221                 }
222                 c.collections.Remove(k)
223                 size -= int64(entsize[i])
224         }
225 }
226
227 // collectionBytes returns the approximate memory size of the
228 // collection cache.
229 func (c *cache) collectionBytes() uint64 {
230         var size uint64
231         for _, k := range c.collections.Keys() {
232                 v, ok := c.collections.Peek(k)
233                 if !ok {
234                         continue
235                 }
236                 size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
237         }
238         return size
239 }
240
241 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
242         if pdh == "" {
243                 return nil
244         } else if ent, cached := c.collections.Get(pdh); !cached {
245                 return nil
246         } else {
247                 ent := ent.(*cachedCollection)
248                 if ent.expire.Before(time.Now()) {
249                         c.collections.Remove(pdh)
250                         return nil
251                 } else {
252                         atomic.AddUint64(&c.stats.CollectionHits, 1)
253                         return ent.collection
254                 }
255         }
256 }