17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 const metricsUpdateInterval = time.Second / 10
20
21 type cache struct {
22         cluster     *arvados.Cluster
23         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24         registry    *prometheus.Registry
25         metrics     cacheMetrics
26         pdhs        *lru.TwoQueueCache
27         collections *lru.TwoQueueCache
28         permissions *lru.TwoQueueCache
29         sessions    *lru.TwoQueueCache
30         setupOnce   sync.Once
31 }
32
33 type cacheMetrics struct {
34         requests          prometheus.Counter
35         collectionBytes   prometheus.Gauge
36         collectionEntries prometheus.Gauge
37         sessionEntries    prometheus.Gauge
38         collectionHits    prometheus.Counter
39         pdhHits           prometheus.Counter
40         permissionHits    prometheus.Counter
41         sessionHits       prometheus.Counter
42         sessionMisses     prometheus.Counter
43         apiCalls          prometheus.Counter
44 }
45
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
48                 Namespace: "arvados",
49                 Subsystem: "keepweb_collectioncache",
50                 Name:      "requests",
51                 Help:      "Number of targetID-to-manifest lookups handled.",
52         })
53         reg.MustRegister(m.requests)
54         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
55                 Namespace: "arvados",
56                 Subsystem: "keepweb_collectioncache",
57                 Name:      "hits",
58                 Help:      "Number of pdh-to-manifest cache hits.",
59         })
60         reg.MustRegister(m.collectionHits)
61         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
62                 Namespace: "arvados",
63                 Subsystem: "keepweb_collectioncache",
64                 Name:      "pdh_hits",
65                 Help:      "Number of uuid-to-pdh cache hits.",
66         })
67         reg.MustRegister(m.pdhHits)
68         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
69                 Namespace: "arvados",
70                 Subsystem: "keepweb_collectioncache",
71                 Name:      "permission_hits",
72                 Help:      "Number of targetID-to-permission cache hits.",
73         })
74         reg.MustRegister(m.permissionHits)
75         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
76                 Namespace: "arvados",
77                 Subsystem: "keepweb_collectioncache",
78                 Name:      "api_calls",
79                 Help:      "Number of outgoing API calls made by cache.",
80         })
81         reg.MustRegister(m.apiCalls)
82         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
83                 Namespace: "arvados",
84                 Subsystem: "keepweb_sessions",
85                 Name:      "cached_collection_bytes",
86                 Help:      "Total size of all cached manifests and sessions.",
87         })
88         reg.MustRegister(m.collectionBytes)
89         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
90                 Namespace: "arvados",
91                 Subsystem: "keepweb_collectioncache",
92                 Name:      "cached_manifests",
93                 Help:      "Number of manifests in cache.",
94         })
95         reg.MustRegister(m.collectionEntries)
96         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
97                 Namespace: "arvados",
98                 Subsystem: "keepweb_sessions",
99                 Name:      "active",
100                 Help:      "Number of active token sessions.",
101         })
102         reg.MustRegister(m.sessionEntries)
103         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104                 Namespace: "arvados",
105                 Subsystem: "keepweb_sessions",
106                 Name:      "hits",
107                 Help:      "Number of token session cache hits.",
108         })
109         reg.MustRegister(m.sessionHits)
110         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111                 Namespace: "arvados",
112                 Subsystem: "keepweb_sessions",
113                 Name:      "misses",
114                 Help:      "Number of token session cache misses.",
115         })
116         reg.MustRegister(m.sessionMisses)
117 }
118
119 type cachedPDH struct {
120         expire time.Time
121         pdh    string
122 }
123
124 type cachedCollection struct {
125         expire     time.Time
126         collection *arvados.Collection
127 }
128
129 type cachedPermission struct {
130         expire time.Time
131 }
132
133 type cachedSession struct {
134         expire        time.Time
135         fs            atomic.Value
136         client        *arvados.Client
137         arvadosclient *arvadosclient.ArvadosClient
138         keepclient    *keepclient.KeepClient
139         user          atomic.Value
140 }
141
142 func (c *cache) setup() {
143         var err error
144         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
145         if err != nil {
146                 panic(err)
147         }
148         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
149         if err != nil {
150                 panic(err)
151         }
152         c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
153         if err != nil {
154                 panic(err)
155         }
156         c.sessions, err = lru.New2Q(c.config.MaxSessions)
157         if err != nil {
158                 panic(err)
159         }
160
161         reg := c.registry
162         if reg == nil {
163                 reg = prometheus.NewRegistry()
164         }
165         c.metrics.setup(reg)
166         go func() {
167                 for range time.Tick(metricsUpdateInterval) {
168                         c.updateGauges()
169                 }
170         }()
171 }
172
173 func (c *cache) updateGauges() {
174         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
175         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
176         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
177 }
178
179 var selectPDH = map[string]interface{}{
180         "select": []string{"portable_data_hash"},
181 }
182
183 // Update saves a modified version (fs) to an existing collection
184 // (coll) and, if successful, updates the relevant cache entries so
185 // subsequent calls to Get() reflect the modifications.
186 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
187         c.setupOnce.Do(c.setup)
188
189         m, err := fs.MarshalManifest(".")
190         if err != nil || m == coll.ManifestText {
191                 return err
192         }
193         coll.ManifestText = m
194         var updated arvados.Collection
195         defer c.pdhs.Remove(coll.UUID)
196         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
197                 "collection": map[string]string{
198                         "manifest_text": coll.ManifestText,
199                 },
200         })
201         if err == nil {
202                 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
203                         expire:     time.Now().Add(time.Duration(c.config.TTL)),
204                         collection: &updated,
205                 })
206         }
207         return err
208 }
209
210 // ResetSession unloads any potentially stale state. Should be called
211 // after write operations, so subsequent reads don't return stale
212 // data.
213 func (c *cache) ResetSession(token string) {
214         c.setupOnce.Do(c.setup)
215         c.sessions.Remove(token)
216 }
217
218 // Get a long-lived CustomFileSystem suitable for doing a read operation
219 // with the given token.
220 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
221         c.setupOnce.Do(c.setup)
222         now := time.Now()
223         ent, _ := c.sessions.Get(token)
224         sess, _ := ent.(*cachedSession)
225         expired := false
226         if sess == nil {
227                 c.metrics.sessionMisses.Inc()
228                 sess = &cachedSession{
229                         expire: now.Add(c.config.TTL.Duration()),
230                 }
231                 var err error
232                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
233                 if err != nil {
234                         return nil, nil, err
235                 }
236                 sess.client.AuthToken = token
237                 sess.arvadosclient, err = arvadosclient.New(sess.client)
238                 if err != nil {
239                         return nil, nil, err
240                 }
241                 sess.keepclient = keepclient.New(sess.arvadosclient)
242                 c.sessions.Add(token, sess)
243         } else if sess.expire.Before(now) {
244                 c.metrics.sessionMisses.Inc()
245                 expired = true
246         } else {
247                 c.metrics.sessionHits.Inc()
248         }
249         go c.pruneSessions()
250         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
251         if fs != nil && !expired {
252                 return fs, sess, nil
253         }
254         fs = sess.client.SiteFileSystem(sess.keepclient)
255         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
256         sess.fs.Store(fs)
257         return fs, sess, nil
258 }
259
260 // Remove all expired session cache entries, then remove more entries
261 // until approximate remaining size <= maxsize/2
262 func (c *cache) pruneSessions() {
263         now := time.Now()
264         var size int64
265         keys := c.sessions.Keys()
266         for _, token := range keys {
267                 ent, ok := c.sessions.Peek(token)
268                 if !ok {
269                         continue
270                 }
271                 s := ent.(*cachedSession)
272                 if s.expire.Before(now) {
273                         c.sessions.Remove(token)
274                         continue
275                 }
276                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
277                         size += fs.MemorySize()
278                 }
279         }
280         // Remove tokens until reaching size limit, starting with the
281         // least frequently used entries (which Keys() returns last).
282         for i := len(keys) - 1; i >= 0; i-- {
283                 token := keys[i]
284                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
285                         break
286                 }
287                 ent, ok := c.sessions.Peek(token)
288                 if !ok {
289                         continue
290                 }
291                 s := ent.(*cachedSession)
292                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
293                 if fs == nil {
294                         continue
295                 }
296                 c.sessions.Remove(token)
297                 size -= fs.MemorySize()
298         }
299 }
300
301 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
302         c.setupOnce.Do(c.setup)
303         c.metrics.requests.Inc()
304
305         permOK := false
306         permKey := arv.ApiToken + "\000" + targetID
307         if forceReload {
308         } else if ent, cached := c.permissions.Get(permKey); cached {
309                 ent := ent.(*cachedPermission)
310                 if ent.expire.Before(time.Now()) {
311                         c.permissions.Remove(permKey)
312                 } else {
313                         permOK = true
314                         c.metrics.permissionHits.Inc()
315                 }
316         }
317
318         var pdh string
319         if arvadosclient.PDHMatch(targetID) {
320                 pdh = targetID
321         } else if ent, cached := c.pdhs.Get(targetID); cached {
322                 ent := ent.(*cachedPDH)
323                 if ent.expire.Before(time.Now()) {
324                         c.pdhs.Remove(targetID)
325                 } else {
326                         pdh = ent.pdh
327                         c.metrics.pdhHits.Inc()
328                 }
329         }
330
331         var collection *arvados.Collection
332         if pdh != "" {
333                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
334         }
335
336         if collection != nil && permOK {
337                 return collection, nil
338         } else if collection != nil {
339                 // Ask API for current PDH for this targetID. Most
340                 // likely, the cached PDH is still correct; if so,
341                 // _and_ the current token has permission, we can
342                 // use our cached manifest.
343                 c.metrics.apiCalls.Inc()
344                 var current arvados.Collection
345                 err := arv.Get("collections", targetID, selectPDH, &current)
346                 if err != nil {
347                         return nil, err
348                 }
349                 if current.PortableDataHash == pdh {
350                         c.permissions.Add(permKey, &cachedPermission{
351                                 expire: time.Now().Add(time.Duration(c.config.TTL)),
352                         })
353                         if pdh != targetID {
354                                 c.pdhs.Add(targetID, &cachedPDH{
355                                         expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
356                                         pdh:    pdh,
357                                 })
358                         }
359                         return collection, err
360                 }
361                 // PDH changed, but now we know we have
362                 // permission -- and maybe we already have the
363                 // new PDH in the cache.
364                 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
365                         return coll, nil
366                 }
367         }
368
369         // Collection manifest is not cached.
370         c.metrics.apiCalls.Inc()
371         err := arv.Get("collections", targetID, nil, &collection)
372         if err != nil {
373                 return nil, err
374         }
375         exp := time.Now().Add(time.Duration(c.config.TTL))
376         c.permissions.Add(permKey, &cachedPermission{
377                 expire: exp,
378         })
379         c.pdhs.Add(targetID, &cachedPDH{
380                 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
381                 pdh:    collection.PortableDataHash,
382         })
383         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
384                 expire:     exp,
385                 collection: collection,
386         })
387         if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
388                 go c.pruneCollections()
389         }
390         return collection, nil
391 }
392
393 // pruneCollections checks the total bytes occupied by manifest_text
394 // in the collection cache and removes old entries as needed to bring
395 // the total size down to CollectionBytes. It also deletes all expired
396 // entries.
397 //
398 // pruneCollections does not aim to be perfectly correct when there is
399 // concurrent cache activity.
400 func (c *cache) pruneCollections() {
401         var size int64
402         now := time.Now()
403         keys := c.collections.Keys()
404         entsize := make([]int, len(keys))
405         expired := make([]bool, len(keys))
406         for i, k := range keys {
407                 v, ok := c.collections.Peek(k)
408                 if !ok {
409                         continue
410                 }
411                 ent := v.(*cachedCollection)
412                 n := len(ent.collection.ManifestText)
413                 size += int64(n)
414                 entsize[i] = n
415                 expired[i] = ent.expire.Before(now)
416         }
417         for i, k := range keys {
418                 if expired[i] {
419                         c.collections.Remove(k)
420                         size -= int64(entsize[i])
421                 }
422         }
423         for i, k := range keys {
424                 if size <= c.config.MaxCollectionBytes/2 {
425                         break
426                 }
427                 if expired[i] {
428                         // already removed this entry in the previous loop
429                         continue
430                 }
431                 c.collections.Remove(k)
432                 size -= int64(entsize[i])
433         }
434 }
435
436 // collectionBytes returns the approximate combined memory size of the
437 // collection cache and session filesystem cache.
438 func (c *cache) collectionBytes() uint64 {
439         var size uint64
440         for _, k := range c.collections.Keys() {
441                 v, ok := c.collections.Peek(k)
442                 if !ok {
443                         continue
444                 }
445                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
446         }
447         for _, token := range c.sessions.Keys() {
448                 ent, ok := c.sessions.Peek(token)
449                 if !ok {
450                         continue
451                 }
452                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
453                         size += uint64(fs.MemorySize())
454                 }
455         }
456         return size
457 }
458
459 func (c *cache) lookupCollection(key string) *arvados.Collection {
460         e, cached := c.collections.Get(key)
461         if !cached {
462                 return nil
463         }
464         ent := e.(*cachedCollection)
465         if ent.expire.Before(time.Now()) {
466                 c.collections.Remove(key)
467                 return nil
468         }
469         c.metrics.collectionHits.Inc()
470         return ent.collection
471 }
472
473 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
474         // Get and cache user record associated with this
475         // token.  We need to know their UUID for logging, and
476         // whether they are an admin or not for certain
477         // permission checks.
478
479         // Get/create session entry
480         _, sess, err := c.GetSession(token)
481         if err != nil {
482                 return nil, err
483         }
484
485         // See if the user is already set, and if so, return it
486         user, _ := sess.user.Load().(*arvados.User)
487         if user != nil {
488                 return user, nil
489         }
490
491         // Fetch the user record
492         c.metrics.apiCalls.Inc()
493         var current arvados.User
494
495         err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
496         if err != nil {
497                 return nil, err
498         }
499
500         // Stash the user record for next time
501         sess.user.Store(&current)
502         return &current, nil
503 }