11809: Cache permission and collection lookups.
[arvados.git] / services / keep-web / cache.go
1 package main
2
3 import (
4         "fmt"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "github.com/hashicorp/golang-lru"
12 )
13
14 type cache struct {
15         TTL               arvados.Duration
16         CollectionEntries int
17         CollectionBytes   int64
18         PermissionEntries int
19         UUIDEntries       int
20
21         stats       cacheStats
22         pdhs        *lru.TwoQueueCache
23         collections *lru.TwoQueueCache
24         permissions *lru.TwoQueueCache
25         setupOnce   sync.Once
26 }
27
28 type cacheStats struct {
29         Requests       uint64
30         CollectionHits uint64
31         PDHHits        uint64
32         PermissionHits uint64
33         APICalls       uint64
34 }
35
36 type cachedPDH struct {
37         expire time.Time
38         pdh    string
39 }
40
41 type cachedCollection struct {
42         expire     time.Time
43         collection map[string]interface{}
44 }
45
46 type cachedPermission struct {
47         expire time.Time
48 }
49
50 func (c *cache) setup() {
51         var err error
52         c.pdhs, err = lru.New2Q(c.UUIDEntries)
53         if err != nil {
54                 panic(err)
55         }
56         c.collections, err = lru.New2Q(c.CollectionEntries)
57         if err != nil {
58                 panic(err)
59         }
60         c.permissions, err = lru.New2Q(c.PermissionEntries)
61         if err != nil {
62                 panic(err)
63         }
64 }
65
66 var selectPDH = map[string]interface{}{
67         "select": []string{"portable_data_hash"},
68 }
69
70 func (c *cache) Stats() cacheStats {
71         return cacheStats{
72                 Requests:       atomic.LoadUint64(&c.stats.Requests),
73                 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
74                 PDHHits:        atomic.LoadUint64(&c.stats.PDHHits),
75                 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
76                 APICalls:       atomic.LoadUint64(&c.stats.APICalls),
77         }
78 }
79
80 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
81         c.setupOnce.Do(c.setup)
82
83         atomic.AddUint64(&c.stats.Requests, 1)
84
85         permOK := false
86         permKey := arv.ApiToken + "\000" + targetID
87         if ent, cached := c.permissions.Get(permKey); cached {
88                 ent := ent.(*cachedPermission)
89                 if ent.expire.Before(time.Now()) {
90                         c.permissions.Remove(permKey)
91                 } else {
92                         permOK = true
93                         atomic.AddUint64(&c.stats.PermissionHits, 1)
94                 }
95         }
96
97         var pdh string
98         if arvadosclient.PDHMatch(targetID) {
99                 pdh = targetID
100         } else if ent, cached := c.pdhs.Get(targetID); cached {
101                 ent := ent.(*cachedPDH)
102                 if ent.expire.Before(time.Now()) {
103                         c.pdhs.Remove(targetID)
104                 } else {
105                         pdh = ent.pdh
106                         atomic.AddUint64(&c.stats.PDHHits, 1)
107                 }
108         }
109
110         collection := c.lookupCollection(pdh)
111
112         if collection != nil && permOK && !forceReload {
113                 return collection, nil
114         }
115
116         if collection != nil {
117                 // Ask API for current PDH for this targetID. Most
118                 // likely, the cached PDH is still correct; if so,
119                 // _and_ the current token has permission, we can
120                 // use our cached manifest.
121                 atomic.AddUint64(&c.stats.APICalls, 1)
122                 var current map[string]interface{}
123                 err := arv.Get("collections", targetID, selectPDH, &current)
124                 if err != nil {
125                         return nil, err
126                 }
127                 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
128                         return nil, fmt.Errorf("API response for %q had no PDH", targetID)
129                 } else if checkPDH == pdh {
130                         exp := time.Now().Add(time.Duration(c.TTL))
131                         c.permissions.Add(permKey, &cachedPermission{
132                                 expire: exp,
133                         })
134                         if pdh != targetID {
135                                 c.pdhs.Add(targetID, &cachedPDH{
136                                         expire: exp,
137                                         pdh:    pdh,
138                                 })
139                         }
140                         return collection, err
141                 } else {
142                         // PDH changed, but now we know we have
143                         // permission -- and maybe we already have the
144                         // new PDH in the cache.
145                         if coll := c.lookupCollection(checkPDH); coll != nil {
146                                 return coll, nil
147                         }
148                 }
149         }
150
151         // Collection manifest is not cached.
152         atomic.AddUint64(&c.stats.APICalls, 1)
153         err := arv.Get("collections", targetID, nil, &collection)
154         if err != nil {
155                 return nil, err
156         }
157         pdh, ok := collection["portable_data_hash"].(string)
158         if !ok {
159                 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
160         }
161         exp := time.Now().Add(time.Duration(c.TTL))
162         c.permissions.Add(permKey, &cachedPermission{
163                 expire: exp,
164         })
165         c.pdhs.Add(targetID, &cachedPDH{
166                 expire: exp,
167                 pdh:    pdh,
168         })
169         c.collections.Add(pdh, &cachedCollection{
170                 expire:     exp,
171                 collection: collection,
172         })
173         if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) {
174                 c.pruneCollections()
175         }
176         return collection, nil
177 }
178
179 // pruneCollections checks the total bytes occupied by manifest_text
180 // in the collection cache and removes old entries as needed to bring
181 // the total size down to CollectionBytes. It also deletes all expired
182 // entries.
183 //
184 // pruneCollections does not aim to be perfectly correct when there is
185 // concurrent cache activity.
186 func (c *cache) pruneCollections() {
187         var size int64
188         now := time.Now()
189         keys := c.collections.Keys()
190         entsize := make([]int, len(keys))
191         expired := make([]bool, len(keys))
192         for i, k := range keys {
193                 v, ok := c.collections.Peek(k)
194                 if !ok {
195                         continue
196                 }
197                 ent := v.(*cachedCollection)
198                 n := len(ent.collection["manifest_text"].(string))
199                 size += int64(n)
200                 entsize[i] = n
201                 expired[i] = ent.expire.Before(now)
202         }
203         for i, k := range keys {
204                 if expired[i] {
205                         c.collections.Remove(k)
206                         size -= int64(entsize[i])
207                 }
208         }
209         for i, k := range keys {
210                 if size <= c.CollectionBytes {
211                         break
212                 }
213                 if expired[i] {
214                         // already removed this entry in the previous loop
215                         continue
216                 }
217                 c.collections.Remove(k)
218                 size -= int64(entsize[i])
219         }
220 }
221
222 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
223         if pdh == "" {
224                 return nil
225         } else if ent, cached := c.collections.Get(pdh); !cached {
226                 return nil
227         } else {
228                 ent := ent.(*cachedCollection)
229                 if ent.expire.Before(time.Now()) {
230                         c.collections.Remove(pdh)
231                         return nil
232                 } else {
233                         atomic.AddUint64(&c.stats.CollectionHits, 1)
234                         return ent.collection
235                 }
236         }
237 }