Fix link anchor refs #19532
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 const metricsUpdateInterval = time.Second / 10
21
22 type cache struct {
23         cluster     *arvados.Cluster
24         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
25         logger      logrus.FieldLogger
26         registry    *prometheus.Registry
27         metrics     cacheMetrics
28         pdhs        *lru.TwoQueueCache
29         collections *lru.TwoQueueCache
30         sessions    *lru.TwoQueueCache
31         setupOnce   sync.Once
32
33         chPruneSessions    chan struct{}
34         chPruneCollections chan struct{}
35 }
36
37 type cacheMetrics struct {
38         requests          prometheus.Counter
39         collectionBytes   prometheus.Gauge
40         collectionEntries prometheus.Gauge
41         sessionEntries    prometheus.Gauge
42         collectionHits    prometheus.Counter
43         pdhHits           prometheus.Counter
44         sessionHits       prometheus.Counter
45         sessionMisses     prometheus.Counter
46         apiCalls          prometheus.Counter
47 }
48
49 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
50         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
51                 Namespace: "arvados",
52                 Subsystem: "keepweb_collectioncache",
53                 Name:      "requests",
54                 Help:      "Number of targetID-to-manifest lookups handled.",
55         })
56         reg.MustRegister(m.requests)
57         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
58                 Namespace: "arvados",
59                 Subsystem: "keepweb_collectioncache",
60                 Name:      "hits",
61                 Help:      "Number of pdh-to-manifest cache hits.",
62         })
63         reg.MustRegister(m.collectionHits)
64         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
65                 Namespace: "arvados",
66                 Subsystem: "keepweb_collectioncache",
67                 Name:      "pdh_hits",
68                 Help:      "Number of uuid-to-pdh cache hits.",
69         })
70         reg.MustRegister(m.pdhHits)
71         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
72                 Namespace: "arvados",
73                 Subsystem: "keepweb_collectioncache",
74                 Name:      "api_calls",
75                 Help:      "Number of outgoing API calls made by cache.",
76         })
77         reg.MustRegister(m.apiCalls)
78         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
79                 Namespace: "arvados",
80                 Subsystem: "keepweb_sessions",
81                 Name:      "cached_collection_bytes",
82                 Help:      "Total size of all cached manifests and sessions.",
83         })
84         reg.MustRegister(m.collectionBytes)
85         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
86                 Namespace: "arvados",
87                 Subsystem: "keepweb_collectioncache",
88                 Name:      "cached_manifests",
89                 Help:      "Number of manifests in cache.",
90         })
91         reg.MustRegister(m.collectionEntries)
92         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
93                 Namespace: "arvados",
94                 Subsystem: "keepweb_sessions",
95                 Name:      "active",
96                 Help:      "Number of active token sessions.",
97         })
98         reg.MustRegister(m.sessionEntries)
99         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100                 Namespace: "arvados",
101                 Subsystem: "keepweb_sessions",
102                 Name:      "hits",
103                 Help:      "Number of token session cache hits.",
104         })
105         reg.MustRegister(m.sessionHits)
106         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
107                 Namespace: "arvados",
108                 Subsystem: "keepweb_sessions",
109                 Name:      "misses",
110                 Help:      "Number of token session cache misses.",
111         })
112         reg.MustRegister(m.sessionMisses)
113 }
114
115 type cachedPDH struct {
116         expire  time.Time
117         refresh time.Time
118         pdh     string
119         mtime   time.Time
120 }
121
122 type cachedCollection struct {
123         expire     time.Time
124         collection *arvados.Collection
125 }
126
127 type cachedPermission struct {
128         expire time.Time
129 }
130
131 type cachedSession struct {
132         expire        time.Time
133         fs            atomic.Value
134         client        *arvados.Client
135         arvadosclient *arvadosclient.ArvadosClient
136         keepclient    *keepclient.KeepClient
137         user          atomic.Value
138 }
139
140 func (c *cache) setup() {
141         var err error
142         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
143         if err != nil {
144                 panic(err)
145         }
146         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
147         if err != nil {
148                 panic(err)
149         }
150         c.sessions, err = lru.New2Q(c.config.MaxSessions)
151         if err != nil {
152                 panic(err)
153         }
154
155         reg := c.registry
156         if reg == nil {
157                 reg = prometheus.NewRegistry()
158         }
159         c.metrics.setup(reg)
160         go func() {
161                 for range time.Tick(metricsUpdateInterval) {
162                         c.updateGauges()
163                 }
164         }()
165         c.chPruneCollections = make(chan struct{}, 1)
166         go func() {
167                 for range c.chPruneCollections {
168                         c.pruneCollections()
169                 }
170         }()
171         c.chPruneSessions = make(chan struct{}, 1)
172         go func() {
173                 for range c.chPruneSessions {
174                         c.pruneSessions()
175                 }
176         }()
177 }
178
179 func (c *cache) updateGauges() {
180         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
181         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
182         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
183 }
184
185 var selectPDH = map[string]interface{}{
186         "select": []string{"portable_data_hash"},
187 }
188
189 // Update saves a modified version (fs) to an existing collection
190 // (coll) and, if successful, updates the relevant cache entries so
191 // subsequent calls to Get() reflect the modifications.
192 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
193         c.setupOnce.Do(c.setup)
194
195         m, err := fs.MarshalManifest(".")
196         if err != nil || m == coll.ManifestText {
197                 return err
198         }
199         coll.ManifestText = m
200         var updated arvados.Collection
201         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
202                 "collection": map[string]string{
203                         "manifest_text": coll.ManifestText,
204                 },
205         })
206         if err != nil {
207                 c.pdhs.Remove(coll.UUID)
208                 return err
209         }
210         c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
211                 expire:     time.Now().Add(time.Duration(c.config.TTL)),
212                 collection: &updated,
213         })
214         c.pdhs.Add(coll.UUID, &cachedPDH{
215                 expire:  time.Now().Add(time.Duration(c.config.TTL)),
216                 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
217                 pdh:     updated.PortableDataHash,
218                 mtime:   updated.ModifiedAt,
219         })
220         return nil
221 }
222
223 // ResetSession unloads any potentially stale state. Should be called
224 // after write operations, so subsequent reads don't return stale
225 // data.
226 func (c *cache) ResetSession(token string) {
227         c.setupOnce.Do(c.setup)
228         c.sessions.Remove(token)
229 }
230
231 // Get a long-lived CustomFileSystem suitable for doing a read operation
232 // with the given token.
233 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
234         c.setupOnce.Do(c.setup)
235         now := time.Now()
236         ent, _ := c.sessions.Get(token)
237         sess, _ := ent.(*cachedSession)
238         expired := false
239         if sess == nil {
240                 c.metrics.sessionMisses.Inc()
241                 sess = &cachedSession{
242                         expire: now.Add(c.config.TTL.Duration()),
243                 }
244                 var err error
245                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
246                 if err != nil {
247                         return nil, nil, err
248                 }
249                 sess.client.AuthToken = token
250                 sess.arvadosclient, err = arvadosclient.New(sess.client)
251                 if err != nil {
252                         return nil, nil, err
253                 }
254                 sess.keepclient = keepclient.New(sess.arvadosclient)
255                 c.sessions.Add(token, sess)
256         } else if sess.expire.Before(now) {
257                 c.metrics.sessionMisses.Inc()
258                 expired = true
259         } else {
260                 c.metrics.sessionHits.Inc()
261         }
262         select {
263         case c.chPruneSessions <- struct{}{}:
264         default:
265         }
266         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
267         if fs != nil && !expired {
268                 return fs, sess, nil
269         }
270         fs = sess.client.SiteFileSystem(sess.keepclient)
271         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
272         sess.fs.Store(fs)
273         return fs, sess, nil
274 }
275
276 // Remove all expired session cache entries, then remove more entries
277 // until approximate remaining size <= maxsize/2
278 func (c *cache) pruneSessions() {
279         now := time.Now()
280         keys := c.sessions.Keys()
281         sizes := make([]int64, len(keys))
282         var size int64
283         for i, token := range keys {
284                 ent, ok := c.sessions.Peek(token)
285                 if !ok {
286                         continue
287                 }
288                 s := ent.(*cachedSession)
289                 if s.expire.Before(now) {
290                         c.sessions.Remove(token)
291                         continue
292                 }
293                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
294                         sizes[i] = fs.MemorySize()
295                         size += sizes[i]
296                 }
297         }
298         // Remove tokens until reaching size limit, starting with the
299         // least frequently used entries (which Keys() returns last).
300         for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- {
301                 if sizes[i] > 0 {
302                         c.sessions.Remove(keys[i])
303                         size -= sizes[i]
304                 }
305         }
306 }
307
308 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
309         c.setupOnce.Do(c.setup)
310         c.metrics.requests.Inc()
311
312         var pdhRefresh bool
313         var pdh string
314         var mtime time.Time
315         if arvadosclient.PDHMatch(targetID) {
316                 pdh = targetID
317         } else if ent, cached := c.pdhs.Get(targetID); cached {
318                 ent := ent.(*cachedPDH)
319                 if ent.expire.Before(time.Now()) {
320                         c.pdhs.Remove(targetID)
321                 } else {
322                         pdh = ent.pdh
323                         mtime = ent.mtime
324                         pdhRefresh = forceReload || time.Now().After(ent.refresh)
325                         c.metrics.pdhHits.Inc()
326                 }
327         }
328
329         if pdh == "" {
330                 // UUID->PDH mapping is not cached, might as well get
331                 // the whole collection record and be done (below).
332                 c.logger.Debugf("cache(%s): have no pdh", targetID)
333         } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
334                 // PDH->manifest is not cached, might as well get the
335                 // whole collection record (below).
336                 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
337         } else if !pdhRefresh {
338                 // We looked up UUID->PDH very recently, and we still
339                 // have the manifest for that PDH.
340                 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
341                 return &arvados.Collection{
342                         UUID:             targetID,
343                         ManifestText:     cached.ManifestText,
344                         PortableDataHash: pdh,
345                         ModifiedAt:       mtime,
346                 }, nil
347         } else {
348                 // Get current PDH for this UUID (and confirm we still
349                 // have read permission).  Most likely, the cached PDH
350                 // is still correct, in which case we can use our
351                 // cached manifest.
352                 c.metrics.apiCalls.Inc()
353                 var current arvados.Collection
354                 err := arv.Get("collections", targetID, selectPDH, &current)
355                 if err != nil {
356                         return nil, err
357                 }
358                 if current.PortableDataHash == pdh {
359                         // PDH has not changed, cached manifest is
360                         // correct.
361                         c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
362                         return &arvados.Collection{
363                                 UUID:             targetID,
364                                 ManifestText:     cached.ManifestText,
365                                 PortableDataHash: pdh,
366                                 ModifiedAt:       current.ModifiedAt,
367                         }, nil
368                 }
369                 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
370                         // PDH changed, and we already have the
371                         // manifest for that new PDH.
372                         c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
373                         return &arvados.Collection{
374                                 UUID:             targetID,
375                                 ManifestText:     cached.ManifestText,
376                                 PortableDataHash: current.PortableDataHash,
377                                 ModifiedAt:       current.ModifiedAt,
378                         }, nil
379                 }
380         }
381
382         // Either UUID->PDH is not cached, or PDH->manifest is not
383         // cached.
384         var retrieved arvados.Collection
385         c.metrics.apiCalls.Inc()
386         err := arv.Get("collections", targetID, nil, &retrieved)
387         if err != nil {
388                 return nil, err
389         }
390         c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
391         exp := time.Now().Add(time.Duration(c.config.TTL))
392         if targetID != retrieved.PortableDataHash {
393                 c.pdhs.Add(targetID, &cachedPDH{
394                         expire:  exp,
395                         refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
396                         pdh:     retrieved.PortableDataHash,
397                         mtime:   retrieved.ModifiedAt,
398                 })
399         }
400         c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
401                 expire:     exp,
402                 collection: &retrieved,
403         })
404         if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
405                 select {
406                 case c.chPruneCollections <- struct{}{}:
407                 default:
408                 }
409         }
410         return &retrieved, nil
411 }
412
413 // pruneCollections checks the total bytes occupied by manifest_text
414 // in the collection cache and removes old entries as needed to bring
415 // the total size down to CollectionBytes. It also deletes all expired
416 // entries.
417 //
418 // pruneCollections does not aim to be perfectly correct when there is
419 // concurrent cache activity.
420 func (c *cache) pruneCollections() {
421         var size int64
422         now := time.Now()
423         keys := c.collections.Keys()
424         entsize := make([]int, len(keys))
425         expired := make([]bool, len(keys))
426         for i, k := range keys {
427                 v, ok := c.collections.Peek(k)
428                 if !ok {
429                         continue
430                 }
431                 ent := v.(*cachedCollection)
432                 n := len(ent.collection.ManifestText)
433                 size += int64(n)
434                 entsize[i] = n
435                 expired[i] = ent.expire.Before(now)
436         }
437         for i, k := range keys {
438                 if expired[i] {
439                         c.collections.Remove(k)
440                         size -= int64(entsize[i])
441                 }
442         }
443         for i, k := range keys {
444                 if size <= c.config.MaxCollectionBytes/2 {
445                         break
446                 }
447                 if expired[i] {
448                         // already removed this entry in the previous loop
449                         continue
450                 }
451                 c.collections.Remove(k)
452                 size -= int64(entsize[i])
453         }
454 }
455
456 // collectionBytes returns the approximate combined memory size of the
457 // collection cache and session filesystem cache.
458 func (c *cache) collectionBytes() uint64 {
459         var size uint64
460         for _, k := range c.collections.Keys() {
461                 v, ok := c.collections.Peek(k)
462                 if !ok {
463                         continue
464                 }
465                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
466         }
467         for _, token := range c.sessions.Keys() {
468                 ent, ok := c.sessions.Peek(token)
469                 if !ok {
470                         continue
471                 }
472                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
473                         size += uint64(fs.MemorySize())
474                 }
475         }
476         return size
477 }
478
479 func (c *cache) lookupCollection(key string) *arvados.Collection {
480         e, cached := c.collections.Get(key)
481         if !cached {
482                 return nil
483         }
484         ent := e.(*cachedCollection)
485         if ent.expire.Before(time.Now()) {
486                 c.collections.Remove(key)
487                 return nil
488         }
489         c.metrics.collectionHits.Inc()
490         return ent.collection
491 }
492
493 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
494         // Get and cache user record associated with this
495         // token.  We need to know their UUID for logging, and
496         // whether they are an admin or not for certain
497         // permission checks.
498
499         // Get/create session entry
500         _, sess, err := c.GetSession(token)
501         if err != nil {
502                 return nil, err
503         }
504
505         // See if the user is already set, and if so, return it
506         user, _ := sess.user.Load().(*arvados.User)
507         if user != nil {
508                 return user, nil
509         }
510
511         // Fetch the user record
512         c.metrics.apiCalls.Inc()
513         var current arvados.User
514
515         err = sess.client.RequestAndDecode(&current, "GET", "arvados/v1/users/current", nil, nil)
516         if err != nil {
517                 return nil, err
518         }
519
520         // Stash the user record for next time
521         sess.user.Store(&current)
522         return &current, nil
523 }