e10419f5dc538969b3fafd2894ab055270868ca4
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepweb
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 const metricsUpdateInterval = time.Second / 10
21
22 type cache struct {
23         cluster     *arvados.Cluster
24         logger      logrus.FieldLogger
25         registry    *prometheus.Registry
26         metrics     cacheMetrics
27         pdhs        *lru.TwoQueueCache
28         collections *lru.TwoQueueCache
29         sessions    *lru.TwoQueueCache
30         setupOnce   sync.Once
31
32         chPruneSessions    chan struct{}
33         chPruneCollections chan struct{}
34 }
35
36 type cacheMetrics struct {
37         requests          prometheus.Counter
38         collectionBytes   prometheus.Gauge
39         collectionEntries prometheus.Gauge
40         sessionEntries    prometheus.Gauge
41         collectionHits    prometheus.Counter
42         pdhHits           prometheus.Counter
43         sessionHits       prometheus.Counter
44         sessionMisses     prometheus.Counter
45         apiCalls          prometheus.Counter
46 }
47
48 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
49         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
50                 Namespace: "arvados",
51                 Subsystem: "keepweb_collectioncache",
52                 Name:      "requests",
53                 Help:      "Number of targetID-to-manifest lookups handled.",
54         })
55         reg.MustRegister(m.requests)
56         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
57                 Namespace: "arvados",
58                 Subsystem: "keepweb_collectioncache",
59                 Name:      "hits",
60                 Help:      "Number of pdh-to-manifest cache hits.",
61         })
62         reg.MustRegister(m.collectionHits)
63         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
64                 Namespace: "arvados",
65                 Subsystem: "keepweb_collectioncache",
66                 Name:      "pdh_hits",
67                 Help:      "Number of uuid-to-pdh cache hits.",
68         })
69         reg.MustRegister(m.pdhHits)
70         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
71                 Namespace: "arvados",
72                 Subsystem: "keepweb_collectioncache",
73                 Name:      "api_calls",
74                 Help:      "Number of outgoing API calls made by cache.",
75         })
76         reg.MustRegister(m.apiCalls)
77         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
78                 Namespace: "arvados",
79                 Subsystem: "keepweb_sessions",
80                 Name:      "cached_collection_bytes",
81                 Help:      "Total size of all cached manifests and sessions.",
82         })
83         reg.MustRegister(m.collectionBytes)
84         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
85                 Namespace: "arvados",
86                 Subsystem: "keepweb_collectioncache",
87                 Name:      "cached_manifests",
88                 Help:      "Number of manifests in cache.",
89         })
90         reg.MustRegister(m.collectionEntries)
91         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
92                 Namespace: "arvados",
93                 Subsystem: "keepweb_sessions",
94                 Name:      "active",
95                 Help:      "Number of active token sessions.",
96         })
97         reg.MustRegister(m.sessionEntries)
98         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
99                 Namespace: "arvados",
100                 Subsystem: "keepweb_sessions",
101                 Name:      "hits",
102                 Help:      "Number of token session cache hits.",
103         })
104         reg.MustRegister(m.sessionHits)
105         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
106                 Namespace: "arvados",
107                 Subsystem: "keepweb_sessions",
108                 Name:      "misses",
109                 Help:      "Number of token session cache misses.",
110         })
111         reg.MustRegister(m.sessionMisses)
112 }
113
114 type cachedPDH struct {
115         expire  time.Time
116         refresh time.Time
117         pdh     string
118 }
119
120 type cachedCollection struct {
121         expire     time.Time
122         collection *arvados.Collection
123 }
124
125 type cachedPermission struct {
126         expire time.Time
127 }
128
129 type cachedSession struct {
130         expire        time.Time
131         fs            atomic.Value
132         client        *arvados.Client
133         arvadosclient *arvadosclient.ArvadosClient
134         keepclient    *keepclient.KeepClient
135         user          atomic.Value
136 }
137
138 func (c *cache) setup() {
139         var err error
140         c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
141         if err != nil {
142                 panic(err)
143         }
144         c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
145         if err != nil {
146                 panic(err)
147         }
148         c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
149         if err != nil {
150                 panic(err)
151         }
152
153         reg := c.registry
154         if reg == nil {
155                 reg = prometheus.NewRegistry()
156         }
157         c.metrics.setup(reg)
158         go func() {
159                 for range time.Tick(metricsUpdateInterval) {
160                         c.updateGauges()
161                 }
162         }()
163         c.chPruneCollections = make(chan struct{}, 1)
164         go func() {
165                 for range c.chPruneCollections {
166                         c.pruneCollections()
167                 }
168         }()
169         c.chPruneSessions = make(chan struct{}, 1)
170         go func() {
171                 for range c.chPruneSessions {
172                         c.pruneSessions()
173                 }
174         }()
175 }
176
177 func (c *cache) updateGauges() {
178         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
179         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
180         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
181 }
182
183 var selectPDH = map[string]interface{}{
184         "select": []string{"portable_data_hash"},
185 }
186
187 // Update saves a modified version (fs) to an existing collection
188 // (coll) and, if successful, updates the relevant cache entries so
189 // subsequent calls to Get() reflect the modifications.
190 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
191         c.setupOnce.Do(c.setup)
192
193         m, err := fs.MarshalManifest(".")
194         if err != nil || m == coll.ManifestText {
195                 return err
196         }
197         coll.ManifestText = m
198         var updated arvados.Collection
199         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
200                 "collection": map[string]string{
201                         "manifest_text": coll.ManifestText,
202                 },
203         })
204         if err != nil {
205                 c.pdhs.Remove(coll.UUID)
206                 return err
207         }
208         c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
209                 expire:     time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
210                 collection: &updated,
211         })
212         c.pdhs.Add(coll.UUID, &cachedPDH{
213                 expire:  time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
214                 refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
215                 pdh:     updated.PortableDataHash,
216         })
217         return nil
218 }
219
220 // ResetSession unloads any potentially stale state. Should be called
221 // after write operations, so subsequent reads don't return stale
222 // data.
223 func (c *cache) ResetSession(token string) {
224         c.setupOnce.Do(c.setup)
225         c.sessions.Remove(token)
226 }
227
228 // Get a long-lived CustomFileSystem suitable for doing a read operation
229 // with the given token.
230 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
231         c.setupOnce.Do(c.setup)
232         now := time.Now()
233         ent, _ := c.sessions.Get(token)
234         sess, _ := ent.(*cachedSession)
235         expired := false
236         if sess == nil {
237                 c.metrics.sessionMisses.Inc()
238                 sess = &cachedSession{
239                         expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
240                 }
241                 var err error
242                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
243                 if err != nil {
244                         return nil, nil, err
245                 }
246                 sess.client.AuthToken = token
247                 sess.arvadosclient, err = arvadosclient.New(sess.client)
248                 if err != nil {
249                         return nil, nil, err
250                 }
251                 sess.keepclient = keepclient.New(sess.arvadosclient)
252                 c.sessions.Add(token, sess)
253         } else if sess.expire.Before(now) {
254                 c.metrics.sessionMisses.Inc()
255                 expired = true
256         } else {
257                 c.metrics.sessionHits.Inc()
258         }
259         select {
260         case c.chPruneSessions <- struct{}{}:
261         default:
262         }
263         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
264         if fs != nil && !expired {
265                 return fs, sess, nil
266         }
267         fs = sess.client.SiteFileSystem(sess.keepclient)
268         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
269         sess.fs.Store(fs)
270         return fs, sess, nil
271 }
272
273 // Remove all expired session cache entries, then remove more entries
274 // until approximate remaining size <= maxsize/2
275 func (c *cache) pruneSessions() {
276         now := time.Now()
277         var size int64
278         keys := c.sessions.Keys()
279         for idx, token := range keys {
280                 ent, ok := c.sessions.Peek(token)
281                 if !ok {
282                         continue
283                 }
284                 s := ent.(*cachedSession)
285                 if s.expire.Before(now) {
286                         c.sessions.Remove(token)
287                         keys[idx] = ""
288                         continue
289                 }
290                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
291                         size += fs.MemorySize()
292                 }
293         }
294         // Remove tokens until reaching size limit, starting with the
295         // least frequently used entries (which Keys() returns last).
296         for i := len(keys) - 1; i >= 0; i-- {
297                 token := keys[i]
298                 if token == "" {
299                         // removed this session in the loop above;
300                         // don't prune it, even if it's already been
301                         // reinserted.
302                         continue
303                 }
304                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
305                         break
306                 }
307                 ent, ok := c.sessions.Peek(token)
308                 if !ok {
309                         continue
310                 }
311                 s := ent.(*cachedSession)
312                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
313                 if fs == nil {
314                         continue
315                 }
316                 c.sessions.Remove(token)
317                 size -= fs.MemorySize()
318         }
319 }
320
321 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
322         c.setupOnce.Do(c.setup)
323         c.metrics.requests.Inc()
324
325         var pdhRefresh bool
326         var pdh string
327         if arvadosclient.PDHMatch(targetID) {
328                 pdh = targetID
329         } else if ent, cached := c.pdhs.Get(targetID); cached {
330                 ent := ent.(*cachedPDH)
331                 if ent.expire.Before(time.Now()) {
332                         c.pdhs.Remove(targetID)
333                 } else {
334                         pdh = ent.pdh
335                         pdhRefresh = forceReload || time.Now().After(ent.refresh)
336                         c.metrics.pdhHits.Inc()
337                 }
338         }
339
340         if pdh == "" {
341                 // UUID->PDH mapping is not cached, might as well get
342                 // the whole collection record and be done (below).
343                 c.logger.Debugf("cache(%s): have no pdh", targetID)
344         } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
345                 // PDH->manifest is not cached, might as well get the
346                 // whole collection record (below).
347                 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
348         } else if !pdhRefresh {
349                 // We looked up UUID->PDH very recently, and we still
350                 // have the manifest for that PDH.
351                 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
352                 return cached, nil
353         } else {
354                 // Get current PDH for this UUID (and confirm we still
355                 // have read permission).  Most likely, the cached PDH
356                 // is still correct, in which case we can use our
357                 // cached manifest.
358                 c.metrics.apiCalls.Inc()
359                 var current arvados.Collection
360                 err := arv.Get("collections", targetID, selectPDH, &current)
361                 if err != nil {
362                         return nil, err
363                 }
364                 if current.PortableDataHash == pdh {
365                         // PDH has not changed, cached manifest is
366                         // correct.
367                         c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
368                         return cached, nil
369                 }
370                 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
371                         // PDH changed, and we already have the
372                         // manifest for that new PDH.
373                         c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
374                         return cached, nil
375                 }
376         }
377
378         // Either UUID->PDH is not cached, or PDH->manifest is not
379         // cached.
380         var retrieved arvados.Collection
381         c.metrics.apiCalls.Inc()
382         err := arv.Get("collections", targetID, nil, &retrieved)
383         if err != nil {
384                 return nil, err
385         }
386         c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
387         exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
388         if targetID != retrieved.PortableDataHash {
389                 c.pdhs.Add(targetID, &cachedPDH{
390                         expire:  exp,
391                         refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
392                         pdh:     retrieved.PortableDataHash,
393                 })
394         }
395         c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
396                 expire:     exp,
397                 collection: &retrieved,
398         })
399         if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
400                 select {
401                 case c.chPruneCollections <- struct{}{}:
402                 default:
403                 }
404         }
405         return &retrieved, nil
406 }
407
408 // pruneCollections checks the total bytes occupied by manifest_text
409 // in the collection cache and removes old entries as needed to bring
410 // the total size down to CollectionBytes. It also deletes all expired
411 // entries.
412 //
413 // pruneCollections does not aim to be perfectly correct when there is
414 // concurrent cache activity.
415 func (c *cache) pruneCollections() {
416         var size int64
417         now := time.Now()
418         keys := c.collections.Keys()
419         entsize := make([]int, len(keys))
420         expired := make([]bool, len(keys))
421         for i, k := range keys {
422                 v, ok := c.collections.Peek(k)
423                 if !ok {
424                         continue
425                 }
426                 ent := v.(*cachedCollection)
427                 n := len(ent.collection.ManifestText)
428                 size += int64(n)
429                 entsize[i] = n
430                 expired[i] = ent.expire.Before(now)
431         }
432         for i, k := range keys {
433                 if expired[i] {
434                         c.collections.Remove(k)
435                         size -= int64(entsize[i])
436                 }
437         }
438         for i, k := range keys {
439                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
440                         break
441                 }
442                 if expired[i] {
443                         // already removed this entry in the previous loop
444                         continue
445                 }
446                 c.collections.Remove(k)
447                 size -= int64(entsize[i])
448         }
449 }
450
451 // collectionBytes returns the approximate combined memory size of the
452 // collection cache and session filesystem cache.
453 func (c *cache) collectionBytes() uint64 {
454         var size uint64
455         for _, k := range c.collections.Keys() {
456                 v, ok := c.collections.Peek(k)
457                 if !ok {
458                         continue
459                 }
460                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
461         }
462         for _, token := range c.sessions.Keys() {
463                 ent, ok := c.sessions.Peek(token)
464                 if !ok {
465                         continue
466                 }
467                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
468                         size += uint64(fs.MemorySize())
469                 }
470         }
471         return size
472 }
473
474 func (c *cache) lookupCollection(key string) *arvados.Collection {
475         e, cached := c.collections.Get(key)
476         if !cached {
477                 return nil
478         }
479         ent := e.(*cachedCollection)
480         if ent.expire.Before(time.Now()) {
481                 c.collections.Remove(key)
482                 return nil
483         }
484         c.metrics.collectionHits.Inc()
485         return ent.collection
486 }
487
488 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
489         // Get and cache user record associated with this
490         // token.  We need to know their UUID for logging, and
491         // whether they are an admin or not for certain
492         // permission checks.
493
494         // Get/create session entry
495         _, sess, err := c.GetSession(token)
496         if err != nil {
497                 return nil, err
498         }
499
500         // See if the user is already set, and if so, return it
501         user, _ := sess.user.Load().(*arvados.User)
502         if user != nil {
503                 return user, nil
504         }
505
506         // Fetch the user record
507         c.metrics.apiCalls.Inc()
508         var current arvados.User
509
510         err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
511         if err != nil {
512                 return nil, err
513         }
514
515         // Stash the user record for next time
516         sess.user.Store(&current)
517         return &current, nil
518 }