1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 const metricsUpdateInterval = time.Second / 10
23 cluster *arvados.Cluster
24 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
25 logger logrus.FieldLogger
26 registry *prometheus.Registry
28 pdhs *lru.TwoQueueCache
29 collections *lru.TwoQueueCache
30 sessions *lru.TwoQueueCache
33 chPruneSessions chan struct{}
34 chPruneCollections chan struct{}
37 type cacheMetrics struct {
38 requests prometheus.Counter
39 collectionBytes prometheus.Gauge
40 collectionEntries prometheus.Gauge
41 sessionEntries prometheus.Gauge
42 collectionHits prometheus.Counter
43 pdhHits prometheus.Counter
44 sessionHits prometheus.Counter
45 sessionMisses prometheus.Counter
46 apiCalls prometheus.Counter
49 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
50 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
52 Subsystem: "keepweb_collectioncache",
54 Help: "Number of targetID-to-manifest lookups handled.",
56 reg.MustRegister(m.requests)
57 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
59 Subsystem: "keepweb_collectioncache",
61 Help: "Number of pdh-to-manifest cache hits.",
63 reg.MustRegister(m.collectionHits)
64 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
66 Subsystem: "keepweb_collectioncache",
68 Help: "Number of uuid-to-pdh cache hits.",
70 reg.MustRegister(m.pdhHits)
71 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
73 Subsystem: "keepweb_collectioncache",
75 Help: "Number of outgoing API calls made by cache.",
77 reg.MustRegister(m.apiCalls)
78 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
80 Subsystem: "keepweb_sessions",
81 Name: "cached_collection_bytes",
82 Help: "Total size of all cached manifests and sessions.",
84 reg.MustRegister(m.collectionBytes)
85 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
87 Subsystem: "keepweb_collectioncache",
88 Name: "cached_manifests",
89 Help: "Number of manifests in cache.",
91 reg.MustRegister(m.collectionEntries)
92 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
94 Subsystem: "keepweb_sessions",
96 Help: "Number of active token sessions.",
98 reg.MustRegister(m.sessionEntries)
99 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100 Namespace: "arvados",
101 Subsystem: "keepweb_sessions",
103 Help: "Number of token session cache hits.",
105 reg.MustRegister(m.sessionHits)
106 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
107 Namespace: "arvados",
108 Subsystem: "keepweb_sessions",
110 Help: "Number of token session cache misses.",
112 reg.MustRegister(m.sessionMisses)
115 type cachedPDH struct {
122 type cachedCollection struct {
124 collection *arvados.Collection
127 type cachedPermission struct {
131 type cachedSession struct {
134 client *arvados.Client
135 arvadosclient *arvadosclient.ArvadosClient
136 keepclient *keepclient.KeepClient
140 func (c *cache) setup() {
142 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
146 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
150 c.sessions, err = lru.New2Q(c.config.MaxSessions)
157 reg = prometheus.NewRegistry()
161 for range time.Tick(metricsUpdateInterval) {
165 c.chPruneCollections = make(chan struct{}, 1)
167 for range c.chPruneCollections {
171 c.chPruneSessions = make(chan struct{}, 1)
173 for range c.chPruneSessions {
179 func (c *cache) updateGauges() {
180 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
181 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
182 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
185 var selectPDH = map[string]interface{}{
186 "select": []string{"portable_data_hash"},
189 // Update saves a modified version (fs) to an existing collection
190 // (coll) and, if successful, updates the relevant cache entries so
191 // subsequent calls to Get() reflect the modifications.
192 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
193 c.setupOnce.Do(c.setup)
195 m, err := fs.MarshalManifest(".")
196 if err != nil || m == coll.ManifestText {
199 coll.ManifestText = m
200 var updated arvados.Collection
201 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
202 "collection": map[string]string{
203 "manifest_text": coll.ManifestText,
207 c.pdhs.Remove(coll.UUID)
210 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
211 expire: time.Now().Add(time.Duration(c.config.TTL)),
212 collection: &updated,
214 c.pdhs.Add(coll.UUID, &cachedPDH{
215 expire: time.Now().Add(time.Duration(c.config.TTL)),
216 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
217 pdh: updated.PortableDataHash,
218 mtime: updated.ModifiedAt,
223 // ResetSession unloads any potentially stale state. Should be called
224 // after write operations, so subsequent reads don't return stale
226 func (c *cache) ResetSession(token string) {
227 c.setupOnce.Do(c.setup)
228 c.sessions.Remove(token)
231 // Get a long-lived CustomFileSystem suitable for doing a read operation
232 // with the given token.
233 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
234 c.setupOnce.Do(c.setup)
236 ent, _ := c.sessions.Get(token)
237 sess, _ := ent.(*cachedSession)
240 c.metrics.sessionMisses.Inc()
241 sess = &cachedSession{
242 expire: now.Add(c.config.TTL.Duration()),
245 sess.client, err = arvados.NewClientFromConfig(c.cluster)
249 sess.client.AuthToken = token
250 sess.arvadosclient, err = arvadosclient.New(sess.client)
254 sess.keepclient = keepclient.New(sess.arvadosclient)
255 c.sessions.Add(token, sess)
256 } else if sess.expire.Before(now) {
257 c.metrics.sessionMisses.Inc()
260 c.metrics.sessionHits.Inc()
263 case c.chPruneSessions <- struct{}{}:
266 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
267 if fs != nil && !expired {
270 fs = sess.client.SiteFileSystem(sess.keepclient)
271 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
276 // Remove all expired session cache entries, then remove more entries
277 // until approximate remaining size <= maxsize/2
278 func (c *cache) pruneSessions() {
280 keys := c.sessions.Keys()
281 sizes := make([]int64, len(keys))
283 for i, token := range keys {
284 ent, ok := c.sessions.Peek(token)
288 s := ent.(*cachedSession)
289 if s.expire.Before(now) {
290 c.sessions.Remove(token)
293 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
294 sizes[i] = fs.MemorySize()
298 // Remove tokens until reaching size limit, starting with the
299 // least frequently used entries (which Keys() returns last).
300 for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- {
302 c.sessions.Remove(keys[i])
308 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
309 c.setupOnce.Do(c.setup)
310 c.metrics.requests.Inc()
315 if arvadosclient.PDHMatch(targetID) {
317 } else if ent, cached := c.pdhs.Get(targetID); cached {
318 ent := ent.(*cachedPDH)
319 if ent.expire.Before(time.Now()) {
320 c.pdhs.Remove(targetID)
324 pdhRefresh = forceReload || time.Now().After(ent.refresh)
325 c.metrics.pdhHits.Inc()
330 // UUID->PDH mapping is not cached, might as well get
331 // the whole collection record and be done (below).
332 c.logger.Debugf("cache(%s): have no pdh", targetID)
333 } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
334 // PDH->manifest is not cached, might as well get the
335 // whole collection record (below).
336 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
337 } else if !pdhRefresh {
338 // We looked up UUID->PDH very recently, and we still
339 // have the manifest for that PDH.
340 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
341 return &arvados.Collection{
343 ManifestText: cached.ManifestText,
344 PortableDataHash: pdh,
348 // Get current PDH for this UUID (and confirm we still
349 // have read permission). Most likely, the cached PDH
350 // is still correct, in which case we can use our
352 c.metrics.apiCalls.Inc()
353 var current arvados.Collection
354 err := arv.Get("collections", targetID, selectPDH, ¤t)
358 if current.PortableDataHash == pdh {
359 // PDH has not changed, cached manifest is
361 c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
362 return &arvados.Collection{
364 ManifestText: cached.ManifestText,
365 PortableDataHash: pdh,
366 ModifiedAt: current.ModifiedAt,
369 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
370 // PDH changed, and we already have the
371 // manifest for that new PDH.
372 c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
373 return &arvados.Collection{
375 ManifestText: cached.ManifestText,
376 PortableDataHash: current.PortableDataHash,
377 ModifiedAt: current.ModifiedAt,
382 // Either UUID->PDH is not cached, or PDH->manifest is not
384 var retrieved arvados.Collection
385 c.metrics.apiCalls.Inc()
386 err := arv.Get("collections", targetID, nil, &retrieved)
390 c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
391 exp := time.Now().Add(time.Duration(c.config.TTL))
392 if targetID != retrieved.PortableDataHash {
393 c.pdhs.Add(targetID, &cachedPDH{
395 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
396 pdh: retrieved.PortableDataHash,
397 mtime: retrieved.ModifiedAt,
400 c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
402 collection: &retrieved,
404 if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
406 case c.chPruneCollections <- struct{}{}:
410 return &retrieved, nil
413 // pruneCollections checks the total bytes occupied by manifest_text
414 // in the collection cache and removes old entries as needed to bring
415 // the total size down to CollectionBytes. It also deletes all expired
418 // pruneCollections does not aim to be perfectly correct when there is
419 // concurrent cache activity.
420 func (c *cache) pruneCollections() {
423 keys := c.collections.Keys()
424 entsize := make([]int, len(keys))
425 expired := make([]bool, len(keys))
426 for i, k := range keys {
427 v, ok := c.collections.Peek(k)
431 ent := v.(*cachedCollection)
432 n := len(ent.collection.ManifestText)
435 expired[i] = ent.expire.Before(now)
437 for i, k := range keys {
439 c.collections.Remove(k)
440 size -= int64(entsize[i])
443 for i, k := range keys {
444 if size <= c.config.MaxCollectionBytes/2 {
448 // already removed this entry in the previous loop
451 c.collections.Remove(k)
452 size -= int64(entsize[i])
456 // collectionBytes returns the approximate combined memory size of the
457 // collection cache and session filesystem cache.
458 func (c *cache) collectionBytes() uint64 {
460 for _, k := range c.collections.Keys() {
461 v, ok := c.collections.Peek(k)
465 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
467 for _, token := range c.sessions.Keys() {
468 ent, ok := c.sessions.Peek(token)
472 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
473 size += uint64(fs.MemorySize())
479 func (c *cache) lookupCollection(key string) *arvados.Collection {
480 e, cached := c.collections.Get(key)
484 ent := e.(*cachedCollection)
485 if ent.expire.Before(time.Now()) {
486 c.collections.Remove(key)
489 c.metrics.collectionHits.Inc()
490 return ent.collection
493 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
494 // Get and cache user record associated with this
495 // token. We need to know their UUID for logging, and
496 // whether they are an admin or not for certain
497 // permission checks.
499 // Get/create session entry
500 _, sess, err := c.GetSession(token)
505 // See if the user is already set, and if so, return it
506 user, _ := sess.user.Load().(*arvados.User)
511 // Fetch the user record
512 c.metrics.apiCalls.Inc()
513 var current arvados.User
515 err = sess.client.RequestAndDecode(¤t, "GET", "arvados/v1/users/current", nil, nil)
520 // Stash the user record for next time
521 sess.user.Store(¤t)