1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 const metricsUpdateInterval = time.Second / 10
23 cluster *arvados.Cluster
24 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
25 logger logrus.FieldLogger
26 registry *prometheus.Registry
28 pdhs *lru.TwoQueueCache
29 collections *lru.TwoQueueCache
30 sessions *lru.TwoQueueCache
33 chPruneSessions chan struct{}
34 chPruneCollections chan struct{}
37 type cacheMetrics struct {
38 requests prometheus.Counter
39 collectionBytes prometheus.Gauge
40 collectionEntries prometheus.Gauge
41 sessionEntries prometheus.Gauge
42 collectionHits prometheus.Counter
43 pdhHits prometheus.Counter
44 sessionHits prometheus.Counter
45 sessionMisses prometheus.Counter
46 apiCalls prometheus.Counter
49 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
50 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
52 Subsystem: "keepweb_collectioncache",
54 Help: "Number of targetID-to-manifest lookups handled.",
56 reg.MustRegister(m.requests)
57 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
59 Subsystem: "keepweb_collectioncache",
61 Help: "Number of pdh-to-manifest cache hits.",
63 reg.MustRegister(m.collectionHits)
64 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
66 Subsystem: "keepweb_collectioncache",
68 Help: "Number of uuid-to-pdh cache hits.",
70 reg.MustRegister(m.pdhHits)
71 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
73 Subsystem: "keepweb_collectioncache",
75 Help: "Number of outgoing API calls made by cache.",
77 reg.MustRegister(m.apiCalls)
78 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
80 Subsystem: "keepweb_sessions",
81 Name: "cached_collection_bytes",
82 Help: "Total size of all cached manifests and sessions.",
84 reg.MustRegister(m.collectionBytes)
85 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
87 Subsystem: "keepweb_collectioncache",
88 Name: "cached_manifests",
89 Help: "Number of manifests in cache.",
91 reg.MustRegister(m.collectionEntries)
92 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
94 Subsystem: "keepweb_sessions",
96 Help: "Number of active token sessions.",
98 reg.MustRegister(m.sessionEntries)
99 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100 Namespace: "arvados",
101 Subsystem: "keepweb_sessions",
103 Help: "Number of token session cache hits.",
105 reg.MustRegister(m.sessionHits)
106 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
107 Namespace: "arvados",
108 Subsystem: "keepweb_sessions",
110 Help: "Number of token session cache misses.",
112 reg.MustRegister(m.sessionMisses)
115 type cachedPDH struct {
121 type cachedCollection struct {
123 collection *arvados.Collection
126 type cachedPermission struct {
130 type cachedSession struct {
133 client *arvados.Client
134 arvadosclient *arvadosclient.ArvadosClient
135 keepclient *keepclient.KeepClient
139 func (c *cache) setup() {
141 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
145 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
149 c.sessions, err = lru.New2Q(c.config.MaxSessions)
156 reg = prometheus.NewRegistry()
160 for range time.Tick(metricsUpdateInterval) {
164 c.chPruneCollections = make(chan struct{}, 1)
166 for range c.chPruneCollections {
170 c.chPruneSessions = make(chan struct{}, 1)
172 for range c.chPruneSessions {
178 func (c *cache) updateGauges() {
179 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
180 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
181 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
184 var selectPDH = map[string]interface{}{
185 "select": []string{"portable_data_hash"},
188 // Update saves a modified version (fs) to an existing collection
189 // (coll) and, if successful, updates the relevant cache entries so
190 // subsequent calls to Get() reflect the modifications.
191 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
192 c.setupOnce.Do(c.setup)
194 m, err := fs.MarshalManifest(".")
195 if err != nil || m == coll.ManifestText {
198 coll.ManifestText = m
199 var updated arvados.Collection
200 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
201 "collection": map[string]string{
202 "manifest_text": coll.ManifestText,
206 c.pdhs.Remove(coll.UUID)
209 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
210 expire: time.Now().Add(time.Duration(c.config.TTL)),
211 collection: &updated,
213 c.pdhs.Add(coll.UUID, &cachedPDH{
214 expire: time.Now().Add(time.Duration(c.config.TTL)),
215 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
216 pdh: updated.PortableDataHash,
221 // ResetSession unloads any potentially stale state. Should be called
222 // after write operations, so subsequent reads don't return stale
224 func (c *cache) ResetSession(token string) {
225 c.setupOnce.Do(c.setup)
226 c.sessions.Remove(token)
229 // Get a long-lived CustomFileSystem suitable for doing a read operation
230 // with the given token.
231 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
232 c.setupOnce.Do(c.setup)
234 ent, _ := c.sessions.Get(token)
235 sess, _ := ent.(*cachedSession)
238 c.metrics.sessionMisses.Inc()
239 sess = &cachedSession{
240 expire: now.Add(c.config.TTL.Duration()),
243 sess.client, err = arvados.NewClientFromConfig(c.cluster)
247 sess.client.AuthToken = token
248 sess.arvadosclient, err = arvadosclient.New(sess.client)
252 sess.keepclient = keepclient.New(sess.arvadosclient)
253 c.sessions.Add(token, sess)
254 } else if sess.expire.Before(now) {
255 c.metrics.sessionMisses.Inc()
258 c.metrics.sessionHits.Inc()
261 case c.chPruneSessions <- struct{}{}:
264 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
265 if fs != nil && !expired {
268 fs = sess.client.SiteFileSystem(sess.keepclient)
269 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
274 // Remove all expired session cache entries, then remove more entries
275 // until approximate remaining size <= maxsize/2
276 func (c *cache) pruneSessions() {
278 keys := c.sessions.Keys()
279 sizes := make([]int64, len(keys))
281 for i, token := range keys {
282 ent, ok := c.sessions.Peek(token)
286 s := ent.(*cachedSession)
287 if s.expire.Before(now) {
288 c.sessions.Remove(token)
291 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
292 sizes[i] = fs.MemorySize()
296 // Remove tokens until reaching size limit, starting with the
297 // least frequently used entries (which Keys() returns last).
298 for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- {
300 c.sessions.Remove(keys[i])
306 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
307 c.setupOnce.Do(c.setup)
308 c.metrics.requests.Inc()
312 if arvadosclient.PDHMatch(targetID) {
314 } else if ent, cached := c.pdhs.Get(targetID); cached {
315 ent := ent.(*cachedPDH)
316 if ent.expire.Before(time.Now()) {
317 c.pdhs.Remove(targetID)
320 pdhRefresh = forceReload || time.Now().After(ent.refresh)
321 c.metrics.pdhHits.Inc()
326 // UUID->PDH mapping is not cached, might as well get
327 // the whole collection record and be done (below).
328 c.logger.Debugf("cache(%s): have no pdh", targetID)
329 } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
330 // PDH->manifest is not cached, might as well get the
331 // whole collection record (below).
332 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
333 } else if !pdhRefresh {
334 // We looked up UUID->PDH very recently, and we still
335 // have the manifest for that PDH.
336 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
337 return &arvados.Collection{
339 ManifestText: cached.ManifestText,
340 PortableDataHash: pdh,
343 // Get current PDH for this UUID (and confirm we still
344 // have read permission). Most likely, the cached PDH
345 // is still correct, in which case we can use our
347 c.metrics.apiCalls.Inc()
348 var current arvados.Collection
349 err := arv.Get("collections", targetID, selectPDH, ¤t)
353 if current.PortableDataHash == pdh {
354 // PDH has not changed, cached manifest is
356 c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
357 return &arvados.Collection{
359 ManifestText: cached.ManifestText,
360 PortableDataHash: pdh,
363 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
364 // PDH changed, and we already have the
365 // manifest for that new PDH.
366 c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
367 return &arvados.Collection{
369 ManifestText: cached.ManifestText,
370 PortableDataHash: current.PortableDataHash,
375 // Either UUID->PDH is not cached, or PDH->manifest is not
377 var retrieved arvados.Collection
378 c.metrics.apiCalls.Inc()
379 err := arv.Get("collections", targetID, nil, &retrieved)
383 c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
384 exp := time.Now().Add(time.Duration(c.config.TTL))
385 if targetID != retrieved.PortableDataHash {
386 c.pdhs.Add(targetID, &cachedPDH{
388 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
389 pdh: retrieved.PortableDataHash,
392 c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
394 collection: &retrieved,
396 if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
398 case c.chPruneCollections <- struct{}{}:
402 return &retrieved, nil
405 // pruneCollections checks the total bytes occupied by manifest_text
406 // in the collection cache and removes old entries as needed to bring
407 // the total size down to CollectionBytes. It also deletes all expired
410 // pruneCollections does not aim to be perfectly correct when there is
411 // concurrent cache activity.
412 func (c *cache) pruneCollections() {
415 keys := c.collections.Keys()
416 entsize := make([]int, len(keys))
417 expired := make([]bool, len(keys))
418 for i, k := range keys {
419 v, ok := c.collections.Peek(k)
423 ent := v.(*cachedCollection)
424 n := len(ent.collection.ManifestText)
427 expired[i] = ent.expire.Before(now)
429 for i, k := range keys {
431 c.collections.Remove(k)
432 size -= int64(entsize[i])
435 for i, k := range keys {
436 if size <= c.config.MaxCollectionBytes/2 {
440 // already removed this entry in the previous loop
443 c.collections.Remove(k)
444 size -= int64(entsize[i])
448 // collectionBytes returns the approximate combined memory size of the
449 // collection cache and session filesystem cache.
450 func (c *cache) collectionBytes() uint64 {
452 for _, k := range c.collections.Keys() {
453 v, ok := c.collections.Peek(k)
457 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
459 for _, token := range c.sessions.Keys() {
460 ent, ok := c.sessions.Peek(token)
464 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
465 size += uint64(fs.MemorySize())
471 func (c *cache) lookupCollection(key string) *arvados.Collection {
472 e, cached := c.collections.Get(key)
476 ent := e.(*cachedCollection)
477 if ent.expire.Before(time.Now()) {
478 c.collections.Remove(key)
481 c.metrics.collectionHits.Inc()
482 return ent.collection
485 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
486 // Get and cache user record associated with this
487 // token. We need to know their UUID for logging, and
488 // whether they are an admin or not for certain
489 // permission checks.
491 // Get/create session entry
492 _, sess, err := c.GetSession(token)
497 // See if the user is already set, and if so, return it
498 user, _ := sess.user.Load().(*arvados.User)
503 // Fetch the user record
504 c.metrics.apiCalls.Inc()
505 var current arvados.User
507 err = sess.client.RequestAndDecode(¤t, "GET", "arvados/v1/users/current", nil, nil)
512 // Stash the user record for next time
513 sess.user.Store(¤t)