8784: Use arvados.Collection in cache.
[arvados.git] / services / keep-web / cache.go
1 package main
2
3 import (
4         "sync"
5         "sync/atomic"
6         "time"
7
8         "git.curoverse.com/arvados.git/sdk/go/arvados"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "github.com/hashicorp/golang-lru"
11 )
12
13 type cache struct {
14         TTL                  arvados.Duration
15         MaxCollectionEntries int
16         MaxCollectionBytes   int64
17         MaxPermissionEntries int
18         MaxUUIDEntries       int
19
20         stats       cacheStats
21         pdhs        *lru.TwoQueueCache
22         collections *lru.TwoQueueCache
23         permissions *lru.TwoQueueCache
24         setupOnce   sync.Once
25 }
26
27 type cacheStats struct {
28         Requests          uint64 `json:"Cache.Requests"`
29         CollectionBytes   uint64 `json:"Cache.CollectionBytes"`
30         CollectionEntries int    `json:"Cache.CollectionEntries"`
31         CollectionHits    uint64 `json:"Cache.CollectionHits"`
32         PDHHits           uint64 `json:"Cache.UUIDHits"`
33         PermissionHits    uint64 `json:"Cache.PermissionHits"`
34         APICalls          uint64 `json:"Cache.APICalls"`
35 }
36
37 type cachedPDH struct {
38         expire time.Time
39         pdh    string
40 }
41
42 type cachedCollection struct {
43         expire     time.Time
44         collection *arvados.Collection
45 }
46
47 type cachedPermission struct {
48         expire time.Time
49 }
50
51 func (c *cache) setup() {
52         var err error
53         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
54         if err != nil {
55                 panic(err)
56         }
57         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
58         if err != nil {
59                 panic(err)
60         }
61         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
62         if err != nil {
63                 panic(err)
64         }
65 }
66
67 var selectPDH = map[string]interface{}{
68         "select": []string{"portable_data_hash"},
69 }
70
71 func (c *cache) Stats() cacheStats {
72         c.setupOnce.Do(c.setup)
73         return cacheStats{
74                 Requests:          atomic.LoadUint64(&c.stats.Requests),
75                 CollectionBytes:   c.collectionBytes(),
76                 CollectionEntries: c.collections.Len(),
77                 CollectionHits:    atomic.LoadUint64(&c.stats.CollectionHits),
78                 PDHHits:           atomic.LoadUint64(&c.stats.PDHHits),
79                 PermissionHits:    atomic.LoadUint64(&c.stats.PermissionHits),
80                 APICalls:          atomic.LoadUint64(&c.stats.APICalls),
81         }
82 }
83
84 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
85         c.setupOnce.Do(c.setup)
86
87         atomic.AddUint64(&c.stats.Requests, 1)
88
89         permOK := false
90         permKey := arv.ApiToken + "\000" + targetID
91         if forceReload {
92         } else if ent, cached := c.permissions.Get(permKey); cached {
93                 ent := ent.(*cachedPermission)
94                 if ent.expire.Before(time.Now()) {
95                         c.permissions.Remove(permKey)
96                 } else {
97                         permOK = true
98                         atomic.AddUint64(&c.stats.PermissionHits, 1)
99                 }
100         }
101
102         var pdh string
103         if arvadosclient.PDHMatch(targetID) {
104                 pdh = targetID
105         } else if forceReload {
106         } else if ent, cached := c.pdhs.Get(targetID); cached {
107                 ent := ent.(*cachedPDH)
108                 if ent.expire.Before(time.Now()) {
109                         c.pdhs.Remove(targetID)
110                 } else {
111                         pdh = ent.pdh
112                         atomic.AddUint64(&c.stats.PDHHits, 1)
113                 }
114         }
115
116         var collection *arvados.Collection
117         if pdh != "" {
118                 collection = c.lookupCollection(pdh)
119         }
120
121         if collection != nil && permOK {
122                 return collection, nil
123         } else if collection != nil {
124                 // Ask API for current PDH for this targetID. Most
125                 // likely, the cached PDH is still correct; if so,
126                 // _and_ the current token has permission, we can
127                 // use our cached manifest.
128                 atomic.AddUint64(&c.stats.APICalls, 1)
129                 var current arvados.Collection
130                 err := arv.Get("collections", targetID, selectPDH, &current)
131                 if err != nil {
132                         return nil, err
133                 }
134                 if current.PortableDataHash == pdh {
135                         exp := time.Now().Add(time.Duration(c.TTL))
136                         c.permissions.Add(permKey, &cachedPermission{
137                                 expire: exp,
138                         })
139                         if pdh != targetID {
140                                 c.pdhs.Add(targetID, &cachedPDH{
141                                         expire: exp,
142                                         pdh:    pdh,
143                                 })
144                         }
145                         return collection, err
146                 } else {
147                         // PDH changed, but now we know we have
148                         // permission -- and maybe we already have the
149                         // new PDH in the cache.
150                         if coll := c.lookupCollection(current.PortableDataHash); coll != nil {
151                                 return coll, nil
152                         }
153                 }
154         }
155
156         // Collection manifest is not cached.
157         atomic.AddUint64(&c.stats.APICalls, 1)
158         err := arv.Get("collections", targetID, nil, &collection)
159         if err != nil {
160                 return nil, err
161         }
162         exp := time.Now().Add(time.Duration(c.TTL))
163         c.permissions.Add(permKey, &cachedPermission{
164                 expire: exp,
165         })
166         c.pdhs.Add(targetID, &cachedPDH{
167                 expire: exp,
168                 pdh:    collection.PortableDataHash,
169         })
170         c.collections.Add(collection.PortableDataHash, &cachedCollection{
171                 expire:     exp,
172                 collection: collection,
173         })
174         if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
175                 go c.pruneCollections()
176         }
177         return collection, nil
178 }
179
180 // pruneCollections checks the total bytes occupied by manifest_text
181 // in the collection cache and removes old entries as needed to bring
182 // the total size down to CollectionBytes. It also deletes all expired
183 // entries.
184 //
185 // pruneCollections does not aim to be perfectly correct when there is
186 // concurrent cache activity.
187 func (c *cache) pruneCollections() {
188         var size int64
189         now := time.Now()
190         keys := c.collections.Keys()
191         entsize := make([]int, len(keys))
192         expired := make([]bool, len(keys))
193         for i, k := range keys {
194                 v, ok := c.collections.Peek(k)
195                 if !ok {
196                         continue
197                 }
198                 ent := v.(*cachedCollection)
199                 n := len(ent.collection.ManifestText)
200                 size += int64(n)
201                 entsize[i] = n
202                 expired[i] = ent.expire.Before(now)
203         }
204         for i, k := range keys {
205                 if expired[i] {
206                         c.collections.Remove(k)
207                         size -= int64(entsize[i])
208                 }
209         }
210         for i, k := range keys {
211                 if size <= c.MaxCollectionBytes {
212                         break
213                 }
214                 if expired[i] {
215                         // already removed this entry in the previous loop
216                         continue
217                 }
218                 c.collections.Remove(k)
219                 size -= int64(entsize[i])
220         }
221 }
222
223 // collectionBytes returns the approximate memory size of the
224 // collection cache.
225 func (c *cache) collectionBytes() uint64 {
226         var size uint64
227         for _, k := range c.collections.Keys() {
228                 v, ok := c.collections.Peek(k)
229                 if !ok {
230                         continue
231                 }
232                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
233         }
234         return size
235 }
236
237 func (c *cache) lookupCollection(pdh string) *arvados.Collection {
238         if pdh == "" {
239                 return nil
240         } else if ent, cached := c.collections.Get(pdh); !cached {
241                 return nil
242         } else {
243                 ent := ent.(*cachedCollection)
244                 if ent.expire.Before(time.Now()) {
245                         c.collections.Remove(pdh)
246                         return nil
247                 } else {
248                         atomic.AddUint64(&c.stats.CollectionHits, 1)
249                         return ent.collection
250                 }
251         }
252 }