11729: Avoid refetching manifest in forceReload case
[arvados.git] / services / keep-web / cache.go
1 package main
2
3 import (
4         "fmt"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "github.com/hashicorp/golang-lru"
12 )
13
14 type cache struct {
15         TTL                  arvados.Duration
16         MaxCollectionEntries int
17         MaxCollectionBytes   int64
18         MaxPermissionEntries int
19         MaxUUIDEntries       int
20
21         stats       cacheStats
22         pdhs        *lru.TwoQueueCache
23         collections *lru.TwoQueueCache
24         permissions *lru.TwoQueueCache
25         setupOnce   sync.Once
26 }
27
28 type cacheStats struct {
29         Requests          uint64 `json:"Cache.Requests"`
30         CollectionBytes   uint64 `json:"Cache.CollectionBytes"`
31         CollectionEntries int    `json:"Cache.CollectionEntries"`
32         CollectionHits    uint64 `json:"Cache.CollectionHits"`
33         PDHHits           uint64 `json:"Cache.UUIDHits"`
34         PermissionHits    uint64 `json:"Cache.PermissionHits"`
35         APICalls          uint64 `json:"Cache.APICalls"`
36 }
37
38 type cachedPDH struct {
39         expire time.Time
40         pdh    string
41 }
42
43 type cachedCollection struct {
44         expire     time.Time
45         collection map[string]interface{}
46 }
47
48 type cachedPermission struct {
49         expire time.Time
50 }
51
52 func (c *cache) setup() {
53         var err error
54         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
55         if err != nil {
56                 panic(err)
57         }
58         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
59         if err != nil {
60                 panic(err)
61         }
62         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
63         if err != nil {
64                 panic(err)
65         }
66 }
67
68 var selectPDH = map[string]interface{}{
69         "select": []string{"portable_data_hash"},
70 }
71
72 func (c *cache) Stats() cacheStats {
73         c.setupOnce.Do(c.setup)
74         return cacheStats{
75                 Requests:          atomic.LoadUint64(&c.stats.Requests),
76                 CollectionBytes:   c.collectionBytes(),
77                 CollectionEntries: c.collections.Len(),
78                 CollectionHits:    atomic.LoadUint64(&c.stats.CollectionHits),
79                 PDHHits:           atomic.LoadUint64(&c.stats.PDHHits),
80                 PermissionHits:    atomic.LoadUint64(&c.stats.PermissionHits),
81                 APICalls:          atomic.LoadUint64(&c.stats.APICalls),
82         }
83 }
84
85 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
86         c.setupOnce.Do(c.setup)
87
88         atomic.AddUint64(&c.stats.Requests, 1)
89
90         permOK := false
91         permKey := arv.ApiToken + "\000" + targetID
92         if forceReload {
93         } else if ent, cached := c.permissions.Get(permKey); cached {
94                 ent := ent.(*cachedPermission)
95                 if ent.expire.Before(time.Now()) {
96                         c.permissions.Remove(permKey)
97                 } else {
98                         permOK = true
99                         atomic.AddUint64(&c.stats.PermissionHits, 1)
100                 }
101         }
102
103         var pdh string
104         if arvadosclient.PDHMatch(targetID) {
105                 pdh = targetID
106         } else if ent, cached := c.pdhs.Get(targetID); cached {
107                 ent := ent.(*cachedPDH)
108                 if ent.expire.Before(time.Now()) {
109                         c.pdhs.Remove(targetID)
110                 } else {
111                         pdh = ent.pdh
112                         atomic.AddUint64(&c.stats.PDHHits, 1)
113                 }
114         }
115
116         var collection map[string]interface{}
117         if pdh != "" {
118                 collection = c.lookupCollection(pdh)
119         }
120
121         if collection != nil && permOK {
122                 return collection, nil
123         } else if collection != nil {
124                 // Ask API for current PDH for this targetID. Most
125                 // likely, the cached PDH is still correct; if so,
126                 // _and_ the current token has permission, we can
127                 // use our cached manifest.
128                 atomic.AddUint64(&c.stats.APICalls, 1)
129                 var current map[string]interface{}
130                 err := arv.Get("collections", targetID, selectPDH, &current)
131                 if err != nil {
132                         return nil, err
133                 }
134                 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
135                         return nil, fmt.Errorf("API response for %q had no PDH", targetID)
136                 } else if checkPDH == pdh {
137                         exp := time.Now().Add(time.Duration(c.TTL))
138                         c.permissions.Add(permKey, &cachedPermission{
139                                 expire: exp,
140                         })
141                         if pdh != targetID {
142                                 c.pdhs.Add(targetID, &cachedPDH{
143                                         expire: exp,
144                                         pdh:    pdh,
145                                 })
146                         }
147                         return collection, err
148                 } else {
149                         // PDH changed, but now we know we have
150                         // permission -- and maybe we already have the
151                         // new PDH in the cache.
152                         if coll := c.lookupCollection(checkPDH); coll != nil {
153                                 return coll, nil
154                         }
155                 }
156         }
157
158         // Collection manifest is not cached.
159         atomic.AddUint64(&c.stats.APICalls, 1)
160         err := arv.Get("collections", targetID, nil, &collection)
161         if err != nil {
162                 return nil, err
163         }
164         pdh, ok := collection["portable_data_hash"].(string)
165         if !ok {
166                 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
167         }
168         exp := time.Now().Add(time.Duration(c.TTL))
169         c.permissions.Add(permKey, &cachedPermission{
170                 expire: exp,
171         })
172         c.pdhs.Add(targetID, &cachedPDH{
173                 expire: exp,
174                 pdh:    pdh,
175         })
176         c.collections.Add(pdh, &cachedCollection{
177                 expire:     exp,
178                 collection: collection,
179         })
180         if int64(len(collection["manifest_text"].(string))) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
181                 go c.pruneCollections()
182         }
183         return collection, nil
184 }
185
186 // pruneCollections checks the total bytes occupied by manifest_text
187 // in the collection cache and removes old entries as needed to bring
188 // the total size down to CollectionBytes. It also deletes all expired
189 // entries.
190 //
191 // pruneCollections does not aim to be perfectly correct when there is
192 // concurrent cache activity.
193 func (c *cache) pruneCollections() {
194         var size int64
195         now := time.Now()
196         keys := c.collections.Keys()
197         entsize := make([]int, len(keys))
198         expired := make([]bool, len(keys))
199         for i, k := range keys {
200                 v, ok := c.collections.Peek(k)
201                 if !ok {
202                         continue
203                 }
204                 ent := v.(*cachedCollection)
205                 n := len(ent.collection["manifest_text"].(string))
206                 size += int64(n)
207                 entsize[i] = n
208                 expired[i] = ent.expire.Before(now)
209         }
210         for i, k := range keys {
211                 if expired[i] {
212                         c.collections.Remove(k)
213                         size -= int64(entsize[i])
214                 }
215         }
216         for i, k := range keys {
217                 if size <= c.MaxCollectionBytes {
218                         break
219                 }
220                 if expired[i] {
221                         // already removed this entry in the previous loop
222                         continue
223                 }
224                 c.collections.Remove(k)
225                 size -= int64(entsize[i])
226         }
227 }
228
229 // collectionBytes returns the approximate memory size of the
230 // collection cache.
231 func (c *cache) collectionBytes() uint64 {
232         var size uint64
233         for _, k := range c.collections.Keys() {
234                 v, ok := c.collections.Peek(k)
235                 if !ok {
236                         continue
237                 }
238                 size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
239         }
240         return size
241 }
242
243 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
244         if pdh == "" {
245                 return nil
246         } else if ent, cached := c.collections.Get(pdh); !cached {
247                 return nil
248         } else {
249                 ent := ent.(*cachedCollection)
250                 if ent.expire.Before(time.Now()) {
251                         c.collections.Remove(pdh)
252                         return nil
253                 } else {
254                         atomic.AddUint64(&c.stats.CollectionHits, 1)
255                         return ent.collection
256                 }
257         }
258 }