14287: Merge branch 'master'
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
13         "github.com/hashicorp/golang-lru"
14         "github.com/prometheus/client_golang/prometheus"
15 )
16
17 const metricsUpdateInterval = time.Second / 10
18
19 type cache struct {
20         TTL                  arvados.Duration
21         UUIDTTL              arvados.Duration
22         MaxCollectionEntries int
23         MaxCollectionBytes   int64
24         MaxPermissionEntries int
25         MaxUUIDEntries       int
26
27         registry    *prometheus.Registry
28         metrics     cacheMetrics
29         pdhs        *lru.TwoQueueCache
30         collections *lru.TwoQueueCache
31         permissions *lru.TwoQueueCache
32         setupOnce   sync.Once
33 }
34
35 type cacheMetrics struct {
36         requests          prometheus.Counter
37         collectionBytes   prometheus.Gauge
38         collectionEntries prometheus.Gauge
39         collectionHits    prometheus.Counter
40         pdhHits           prometheus.Counter
41         permissionHits    prometheus.Counter
42         apiCalls          prometheus.Counter
43 }
44
45 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
46         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
47                 Namespace: "arvados",
48                 Subsystem: "keepweb_collectioncache",
49                 Name:      "requests",
50                 Help:      "Number of targetID-to-manifest lookups handled.",
51         })
52         reg.MustRegister(m.requests)
53         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
54                 Namespace: "arvados",
55                 Subsystem: "keepweb_collectioncache",
56                 Name:      "hits",
57                 Help:      "Number of pdh-to-manifest cache hits.",
58         })
59         reg.MustRegister(m.collectionHits)
60         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
61                 Namespace: "arvados",
62                 Subsystem: "keepweb_collectioncache",
63                 Name:      "pdh_hits",
64                 Help:      "Number of uuid-to-pdh cache hits.",
65         })
66         reg.MustRegister(m.pdhHits)
67         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
68                 Namespace: "arvados",
69                 Subsystem: "keepweb_collectioncache",
70                 Name:      "permission_hits",
71                 Help:      "Number of targetID-to-permission cache hits.",
72         })
73         reg.MustRegister(m.permissionHits)
74         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
75                 Namespace: "arvados",
76                 Subsystem: "keepweb_collectioncache",
77                 Name:      "api_calls",
78                 Help:      "Number of outgoing API calls made by cache.",
79         })
80         reg.MustRegister(m.apiCalls)
81         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
82                 Namespace: "arvados",
83                 Subsystem: "keepweb_collectioncache",
84                 Name:      "cached_manifest_bytes",
85                 Help:      "Total size of all manifests in cache.",
86         })
87         reg.MustRegister(m.collectionBytes)
88         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
89                 Namespace: "arvados",
90                 Subsystem: "keepweb_collectioncache",
91                 Name:      "cached_manifests",
92                 Help:      "Number of manifests in cache.",
93         })
94         reg.MustRegister(m.collectionEntries)
95 }
96
97 type cachedPDH struct {
98         expire time.Time
99         pdh    string
100 }
101
102 type cachedCollection struct {
103         expire     time.Time
104         collection *arvados.Collection
105 }
106
107 type cachedPermission struct {
108         expire time.Time
109 }
110
111 func (c *cache) setup() {
112         var err error
113         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
114         if err != nil {
115                 panic(err)
116         }
117         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
118         if err != nil {
119                 panic(err)
120         }
121         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
122         if err != nil {
123                 panic(err)
124         }
125
126         reg := c.registry
127         if reg == nil {
128                 reg = prometheus.NewRegistry()
129         }
130         c.metrics.setup(reg)
131         go func() {
132                 for range time.Tick(metricsUpdateInterval) {
133                         c.updateGauges()
134                 }
135         }()
136 }
137
138 func (c *cache) updateGauges() {
139         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
140         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
141 }
142
143 var selectPDH = map[string]interface{}{
144         "select": []string{"portable_data_hash"},
145 }
146
147 // Update saves a modified version (fs) to an existing collection
148 // (coll) and, if successful, updates the relevant cache entries so
149 // subsequent calls to Get() reflect the modifications.
150 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
151         c.setupOnce.Do(c.setup)
152
153         if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
154                 return err
155         } else {
156                 coll.ManifestText = m
157         }
158         var updated arvados.Collection
159         defer c.pdhs.Remove(coll.UUID)
160         err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
161                 "collection": map[string]string{
162                         "manifest_text": coll.ManifestText,
163                 },
164         })
165         if err == nil {
166                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
167                         expire:     time.Now().Add(time.Duration(c.TTL)),
168                         collection: &updated,
169                 })
170         }
171         return err
172 }
173
174 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
175         c.setupOnce.Do(c.setup)
176         c.metrics.requests.Inc()
177
178         permOK := false
179         permKey := arv.ApiToken + "\000" + targetID
180         if forceReload {
181         } else if ent, cached := c.permissions.Get(permKey); cached {
182                 ent := ent.(*cachedPermission)
183                 if ent.expire.Before(time.Now()) {
184                         c.permissions.Remove(permKey)
185                 } else {
186                         permOK = true
187                         c.metrics.permissionHits.Inc()
188                 }
189         }
190
191         var pdh string
192         if arvadosclient.PDHMatch(targetID) {
193                 pdh = targetID
194         } else if ent, cached := c.pdhs.Get(targetID); cached {
195                 ent := ent.(*cachedPDH)
196                 if ent.expire.Before(time.Now()) {
197                         c.pdhs.Remove(targetID)
198                 } else {
199                         pdh = ent.pdh
200                         c.metrics.pdhHits.Inc()
201                 }
202         }
203
204         var collection *arvados.Collection
205         if pdh != "" {
206                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
207         }
208
209         if collection != nil && permOK {
210                 return collection, nil
211         } else if collection != nil {
212                 // Ask API for current PDH for this targetID. Most
213                 // likely, the cached PDH is still correct; if so,
214                 // _and_ the current token has permission, we can
215                 // use our cached manifest.
216                 c.metrics.apiCalls.Inc()
217                 var current arvados.Collection
218                 err := arv.Get("collections", targetID, selectPDH, &current)
219                 if err != nil {
220                         return nil, err
221                 }
222                 if current.PortableDataHash == pdh {
223                         c.permissions.Add(permKey, &cachedPermission{
224                                 expire: time.Now().Add(time.Duration(c.TTL)),
225                         })
226                         if pdh != targetID {
227                                 c.pdhs.Add(targetID, &cachedPDH{
228                                         expire: time.Now().Add(time.Duration(c.UUIDTTL)),
229                                         pdh:    pdh,
230                                 })
231                         }
232                         return collection, err
233                 } else {
234                         // PDH changed, but now we know we have
235                         // permission -- and maybe we already have the
236                         // new PDH in the cache.
237                         if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
238                                 return coll, nil
239                         }
240                 }
241         }
242
243         // Collection manifest is not cached.
244         c.metrics.apiCalls.Inc()
245         err := arv.Get("collections", targetID, nil, &collection)
246         if err != nil {
247                 return nil, err
248         }
249         exp := time.Now().Add(time.Duration(c.TTL))
250         c.permissions.Add(permKey, &cachedPermission{
251                 expire: exp,
252         })
253         c.pdhs.Add(targetID, &cachedPDH{
254                 expire: time.Now().Add(time.Duration(c.UUIDTTL)),
255                 pdh:    collection.PortableDataHash,
256         })
257         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
258                 expire:     exp,
259                 collection: collection,
260         })
261         if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
262                 go c.pruneCollections()
263         }
264         return collection, nil
265 }
266
267 // pruneCollections checks the total bytes occupied by manifest_text
268 // in the collection cache and removes old entries as needed to bring
269 // the total size down to CollectionBytes. It also deletes all expired
270 // entries.
271 //
272 // pruneCollections does not aim to be perfectly correct when there is
273 // concurrent cache activity.
274 func (c *cache) pruneCollections() {
275         var size int64
276         now := time.Now()
277         keys := c.collections.Keys()
278         entsize := make([]int, len(keys))
279         expired := make([]bool, len(keys))
280         for i, k := range keys {
281                 v, ok := c.collections.Peek(k)
282                 if !ok {
283                         continue
284                 }
285                 ent := v.(*cachedCollection)
286                 n := len(ent.collection.ManifestText)
287                 size += int64(n)
288                 entsize[i] = n
289                 expired[i] = ent.expire.Before(now)
290         }
291         for i, k := range keys {
292                 if expired[i] {
293                         c.collections.Remove(k)
294                         size -= int64(entsize[i])
295                 }
296         }
297         for i, k := range keys {
298                 if size <= c.MaxCollectionBytes {
299                         break
300                 }
301                 if expired[i] {
302                         // already removed this entry in the previous loop
303                         continue
304                 }
305                 c.collections.Remove(k)
306                 size -= int64(entsize[i])
307         }
308 }
309
310 // collectionBytes returns the approximate memory size of the
311 // collection cache.
312 func (c *cache) collectionBytes() uint64 {
313         var size uint64
314         for _, k := range c.collections.Keys() {
315                 v, ok := c.collections.Peek(k)
316                 if !ok {
317                         continue
318                 }
319                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
320         }
321         return size
322 }
323
324 func (c *cache) lookupCollection(key string) *arvados.Collection {
325         e, cached := c.collections.Get(key)
326         if !cached {
327                 return nil
328         }
329         ent := e.(*cachedCollection)
330         if ent.expire.Before(time.Now()) {
331                 c.collections.Remove(key)
332                 return nil
333         }
334         c.metrics.collectionHits.Inc()
335         return ent.collection
336 }