Merge branch '11809-keep-web-cache'
[arvados.git] / services / keep-web / cache.go
1 package main
2
3 import (
4         "fmt"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "github.com/hashicorp/golang-lru"
12 )
13
14 type cache struct {
15         TTL                  arvados.Duration
16         MaxCollectionEntries int
17         MaxCollectionBytes   int64
18         MaxPermissionEntries int
19         MaxUUIDEntries       int
20
21         stats       cacheStats
22         pdhs        *lru.TwoQueueCache
23         collections *lru.TwoQueueCache
24         permissions *lru.TwoQueueCache
25         setupOnce   sync.Once
26 }
27
28 type cacheStats struct {
29         Requests          uint64 `json:"Cache.Requests"`
30         CollectionBytes   uint64 `json:"Cache.CollectionBytes"`
31         CollectionEntries int    `json:"Cache.CollectionEntries"`
32         CollectionHits    uint64 `json:"Cache.CollectionHits"`
33         PDHHits           uint64 `json:"Cache.UUIDHits"`
34         PermissionHits    uint64 `json:"Cache.PermissionHits"`
35         APICalls          uint64 `json:"Cache.APICalls"`
36 }
37
38 type cachedPDH struct {
39         expire time.Time
40         pdh    string
41 }
42
43 type cachedCollection struct {
44         expire     time.Time
45         collection map[string]interface{}
46 }
47
48 type cachedPermission struct {
49         expire time.Time
50 }
51
52 func (c *cache) setup() {
53         var err error
54         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
55         if err != nil {
56                 panic(err)
57         }
58         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
59         if err != nil {
60                 panic(err)
61         }
62         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
63         if err != nil {
64                 panic(err)
65         }
66 }
67
68 var selectPDH = map[string]interface{}{
69         "select": []string{"portable_data_hash"},
70 }
71
72 func (c *cache) Stats() cacheStats {
73         c.setupOnce.Do(c.setup)
74         return cacheStats{
75                 Requests:          atomic.LoadUint64(&c.stats.Requests),
76                 CollectionBytes:   c.collectionBytes(),
77                 CollectionEntries: c.collections.Len(),
78                 CollectionHits:    atomic.LoadUint64(&c.stats.CollectionHits),
79                 PDHHits:           atomic.LoadUint64(&c.stats.PDHHits),
80                 PermissionHits:    atomic.LoadUint64(&c.stats.PermissionHits),
81                 APICalls:          atomic.LoadUint64(&c.stats.APICalls),
82         }
83 }
84
85 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
86         c.setupOnce.Do(c.setup)
87
88         atomic.AddUint64(&c.stats.Requests, 1)
89
90         permOK := false
91         permKey := arv.ApiToken + "\000" + targetID
92         if forceReload {
93         } else if ent, cached := c.permissions.Get(permKey); cached {
94                 ent := ent.(*cachedPermission)
95                 if ent.expire.Before(time.Now()) {
96                         c.permissions.Remove(permKey)
97                 } else {
98                         permOK = true
99                         atomic.AddUint64(&c.stats.PermissionHits, 1)
100                 }
101         }
102
103         var pdh string
104         if arvadosclient.PDHMatch(targetID) {
105                 pdh = targetID
106         } else if forceReload {
107         } else if ent, cached := c.pdhs.Get(targetID); cached {
108                 ent := ent.(*cachedPDH)
109                 if ent.expire.Before(time.Now()) {
110                         c.pdhs.Remove(targetID)
111                 } else {
112                         pdh = ent.pdh
113                         atomic.AddUint64(&c.stats.PDHHits, 1)
114                 }
115         }
116
117         var collection map[string]interface{}
118         if pdh != "" {
119                 collection = c.lookupCollection(pdh)
120         }
121
122         if collection != nil && permOK {
123                 return collection, nil
124         } else if collection != nil {
125                 // Ask API for current PDH for this targetID. Most
126                 // likely, the cached PDH is still correct; if so,
127                 // _and_ the current token has permission, we can
128                 // use our cached manifest.
129                 atomic.AddUint64(&c.stats.APICalls, 1)
130                 var current map[string]interface{}
131                 err := arv.Get("collections", targetID, selectPDH, &current)
132                 if err != nil {
133                         return nil, err
134                 }
135                 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
136                         return nil, fmt.Errorf("API response for %q had no PDH", targetID)
137                 } else if checkPDH == pdh {
138                         exp := time.Now().Add(time.Duration(c.TTL))
139                         c.permissions.Add(permKey, &cachedPermission{
140                                 expire: exp,
141                         })
142                         if pdh != targetID {
143                                 c.pdhs.Add(targetID, &cachedPDH{
144                                         expire: exp,
145                                         pdh:    pdh,
146                                 })
147                         }
148                         return collection, err
149                 } else {
150                         // PDH changed, but now we know we have
151                         // permission -- and maybe we already have the
152                         // new PDH in the cache.
153                         if coll := c.lookupCollection(checkPDH); coll != nil {
154                                 return coll, nil
155                         }
156                 }
157         }
158
159         // Collection manifest is not cached.
160         atomic.AddUint64(&c.stats.APICalls, 1)
161         err := arv.Get("collections", targetID, nil, &collection)
162         if err != nil {
163                 return nil, err
164         }
165         pdh, ok := collection["portable_data_hash"].(string)
166         if !ok {
167                 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
168         }
169         exp := time.Now().Add(time.Duration(c.TTL))
170         c.permissions.Add(permKey, &cachedPermission{
171                 expire: exp,
172         })
173         c.pdhs.Add(targetID, &cachedPDH{
174                 expire: exp,
175                 pdh:    pdh,
176         })
177         c.collections.Add(pdh, &cachedCollection{
178                 expire:     exp,
179                 collection: collection,
180         })
181         if int64(len(collection["manifest_text"].(string))) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
182                 go c.pruneCollections()
183         }
184         return collection, nil
185 }
186
187 // pruneCollections checks the total bytes occupied by manifest_text
188 // in the collection cache and removes old entries as needed to bring
189 // the total size down to CollectionBytes. It also deletes all expired
190 // entries.
191 //
192 // pruneCollections does not aim to be perfectly correct when there is
193 // concurrent cache activity.
194 func (c *cache) pruneCollections() {
195         var size int64
196         now := time.Now()
197         keys := c.collections.Keys()
198         entsize := make([]int, len(keys))
199         expired := make([]bool, len(keys))
200         for i, k := range keys {
201                 v, ok := c.collections.Peek(k)
202                 if !ok {
203                         continue
204                 }
205                 ent := v.(*cachedCollection)
206                 n := len(ent.collection["manifest_text"].(string))
207                 size += int64(n)
208                 entsize[i] = n
209                 expired[i] = ent.expire.Before(now)
210         }
211         for i, k := range keys {
212                 if expired[i] {
213                         c.collections.Remove(k)
214                         size -= int64(entsize[i])
215                 }
216         }
217         for i, k := range keys {
218                 if size <= c.MaxCollectionBytes {
219                         break
220                 }
221                 if expired[i] {
222                         // already removed this entry in the previous loop
223                         continue
224                 }
225                 c.collections.Remove(k)
226                 size -= int64(entsize[i])
227         }
228 }
229
230 // collectionBytes returns the approximate memory size of the
231 // collection cache.
232 func (c *cache) collectionBytes() uint64 {
233         var size uint64
234         for _, k := range c.collections.Keys() {
235                 v, ok := c.collections.Peek(k)
236                 if !ok {
237                         continue
238                 }
239                 size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
240         }
241         return size
242 }
243
244 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
245         if pdh == "" {
246                 return nil
247         } else if ent, cached := c.collections.Get(pdh); !cached {
248                 return nil
249         } else {
250                 ent := ent.(*cachedCollection)
251                 if ent.expire.Before(time.Now()) {
252                         c.collections.Remove(pdh)
253                         return nil
254                 } else {
255                         atomic.AddUint64(&c.stats.CollectionHits, 1)
256                         return ent.collection
257                 }
258         }
259 }