1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 const metricsUpdateInterval = time.Second / 10
23 cluster *arvados.Cluster
24 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
25 logger logrus.FieldLogger
26 registry *prometheus.Registry
28 pdhs *lru.TwoQueueCache
29 collections *lru.TwoQueueCache
30 sessions *lru.TwoQueueCache
33 chPruneSessions chan struct{}
34 chPruneCollections chan struct{}
37 type cacheMetrics struct {
38 requests prometheus.Counter
39 collectionBytes prometheus.Gauge
40 collectionEntries prometheus.Gauge
41 sessionEntries prometheus.Gauge
42 collectionHits prometheus.Counter
43 pdhHits prometheus.Counter
44 sessionHits prometheus.Counter
45 sessionMisses prometheus.Counter
46 apiCalls prometheus.Counter
49 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
50 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
52 Subsystem: "keepweb_collectioncache",
54 Help: "Number of targetID-to-manifest lookups handled.",
56 reg.MustRegister(m.requests)
57 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
59 Subsystem: "keepweb_collectioncache",
61 Help: "Number of pdh-to-manifest cache hits.",
63 reg.MustRegister(m.collectionHits)
64 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
66 Subsystem: "keepweb_collectioncache",
68 Help: "Number of uuid-to-pdh cache hits.",
70 reg.MustRegister(m.pdhHits)
71 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
73 Subsystem: "keepweb_collectioncache",
75 Help: "Number of outgoing API calls made by cache.",
77 reg.MustRegister(m.apiCalls)
78 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
80 Subsystem: "keepweb_sessions",
81 Name: "cached_collection_bytes",
82 Help: "Total size of all cached manifests and sessions.",
84 reg.MustRegister(m.collectionBytes)
85 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
87 Subsystem: "keepweb_collectioncache",
88 Name: "cached_manifests",
89 Help: "Number of manifests in cache.",
91 reg.MustRegister(m.collectionEntries)
92 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
94 Subsystem: "keepweb_sessions",
96 Help: "Number of active token sessions.",
98 reg.MustRegister(m.sessionEntries)
99 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100 Namespace: "arvados",
101 Subsystem: "keepweb_sessions",
103 Help: "Number of token session cache hits.",
105 reg.MustRegister(m.sessionHits)
106 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
107 Namespace: "arvados",
108 Subsystem: "keepweb_sessions",
110 Help: "Number of token session cache misses.",
112 reg.MustRegister(m.sessionMisses)
115 type cachedPDH struct {
121 type cachedCollection struct {
123 collection *arvados.Collection
126 type cachedPermission struct {
130 type cachedSession struct {
133 client *arvados.Client
134 arvadosclient *arvadosclient.ArvadosClient
135 keepclient *keepclient.KeepClient
139 func (c *cache) setup() {
141 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
145 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
149 c.sessions, err = lru.New2Q(c.config.MaxSessions)
156 reg = prometheus.NewRegistry()
160 for range time.Tick(metricsUpdateInterval) {
164 c.chPruneCollections = make(chan struct{}, 1)
166 for range c.chPruneCollections {
170 c.chPruneSessions = make(chan struct{}, 1)
172 for range c.chPruneSessions {
178 func (c *cache) updateGauges() {
179 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
180 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
181 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
184 var selectPDH = map[string]interface{}{
185 "select": []string{"portable_data_hash"},
188 // Update saves a modified version (fs) to an existing collection
189 // (coll) and, if successful, updates the relevant cache entries so
190 // subsequent calls to Get() reflect the modifications.
191 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
192 c.setupOnce.Do(c.setup)
194 m, err := fs.MarshalManifest(".")
195 if err != nil || m == coll.ManifestText {
198 coll.ManifestText = m
199 var updated arvados.Collection
200 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
201 "collection": map[string]string{
202 "manifest_text": coll.ManifestText,
206 c.pdhs.Remove(coll.UUID)
209 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
210 expire: time.Now().Add(time.Duration(c.config.TTL)),
211 collection: &updated,
213 c.pdhs.Add(coll.UUID, &cachedPDH{
214 expire: time.Now().Add(time.Duration(c.config.TTL)),
215 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
216 pdh: updated.PortableDataHash,
221 // ResetSession unloads any potentially stale state. Should be called
222 // after write operations, so subsequent reads don't return stale
224 func (c *cache) ResetSession(token string) {
225 c.setupOnce.Do(c.setup)
226 c.sessions.Remove(token)
229 // Get a long-lived CustomFileSystem suitable for doing a read operation
230 // with the given token.
231 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
232 c.setupOnce.Do(c.setup)
234 ent, _ := c.sessions.Get(token)
235 sess, _ := ent.(*cachedSession)
238 c.metrics.sessionMisses.Inc()
239 sess = &cachedSession{
240 expire: now.Add(c.config.TTL.Duration()),
243 sess.client, err = arvados.NewClientFromConfig(c.cluster)
247 sess.client.AuthToken = token
248 sess.arvadosclient, err = arvadosclient.New(sess.client)
252 sess.keepclient = keepclient.New(sess.arvadosclient)
253 c.sessions.Add(token, sess)
254 } else if sess.expire.Before(now) {
255 c.metrics.sessionMisses.Inc()
258 c.metrics.sessionHits.Inc()
261 case c.chPruneSessions <- struct{}{}:
264 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
265 if fs != nil && !expired {
268 fs = sess.client.SiteFileSystem(sess.keepclient)
269 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
274 // Remove all expired session cache entries, then remove more entries
275 // until approximate remaining size <= maxsize/2
276 func (c *cache) pruneSessions() {
279 keys := c.sessions.Keys()
280 for _, token := range keys {
281 ent, ok := c.sessions.Peek(token)
285 s := ent.(*cachedSession)
286 if s.expire.Before(now) {
287 c.sessions.Remove(token)
290 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
291 size += fs.MemorySize()
294 // Remove tokens until reaching size limit, starting with the
295 // least frequently used entries (which Keys() returns last).
296 for i := len(keys) - 1; i >= 0; i-- {
298 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
301 ent, ok := c.sessions.Peek(token)
305 s := ent.(*cachedSession)
306 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
310 c.sessions.Remove(token)
311 size -= fs.MemorySize()
315 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
316 c.setupOnce.Do(c.setup)
317 c.metrics.requests.Inc()
321 if arvadosclient.PDHMatch(targetID) {
323 } else if ent, cached := c.pdhs.Get(targetID); cached {
324 ent := ent.(*cachedPDH)
325 if ent.expire.Before(time.Now()) {
326 c.pdhs.Remove(targetID)
329 pdhRefresh = forceReload || time.Now().After(ent.refresh)
330 c.metrics.pdhHits.Inc()
335 // UUID->PDH mapping is not cached, might as well get
336 // the whole collection record and be done (below).
337 c.logger.Debugf("cache(%s): have no pdh", targetID)
338 } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
339 // PDH->manifest is not cached, might as well get the
340 // whole collection record (below).
341 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
342 } else if !pdhRefresh {
343 // We looked up UUID->PDH very recently, and we still
344 // have the manifest for that PDH.
345 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
348 // Get current PDH for this UUID (and confirm we still
349 // have read permission). Most likely, the cached PDH
350 // is still correct, in which case we can use our
352 c.metrics.apiCalls.Inc()
353 var current arvados.Collection
354 err := arv.Get("collections", targetID, selectPDH, ¤t)
358 if current.PortableDataHash == pdh {
359 // PDH has not changed, cached manifest is
361 c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
364 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
365 // PDH changed, and we already have the
366 // manifest for that new PDH.
367 c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
372 // Either UUID->PDH is not cached, or PDH->manifest is not
374 var retrieved arvados.Collection
375 c.metrics.apiCalls.Inc()
376 err := arv.Get("collections", targetID, nil, &retrieved)
380 c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
381 exp := time.Now().Add(time.Duration(c.config.TTL))
382 if targetID != retrieved.PortableDataHash {
383 c.pdhs.Add(targetID, &cachedPDH{
385 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
386 pdh: retrieved.PortableDataHash,
389 c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
391 collection: &retrieved,
393 if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
395 case c.chPruneCollections <- struct{}{}:
399 return &retrieved, nil
402 // pruneCollections checks the total bytes occupied by manifest_text
403 // in the collection cache and removes old entries as needed to bring
404 // the total size down to CollectionBytes. It also deletes all expired
407 // pruneCollections does not aim to be perfectly correct when there is
408 // concurrent cache activity.
409 func (c *cache) pruneCollections() {
412 keys := c.collections.Keys()
413 entsize := make([]int, len(keys))
414 expired := make([]bool, len(keys))
415 for i, k := range keys {
416 v, ok := c.collections.Peek(k)
420 ent := v.(*cachedCollection)
421 n := len(ent.collection.ManifestText)
424 expired[i] = ent.expire.Before(now)
426 for i, k := range keys {
428 c.collections.Remove(k)
429 size -= int64(entsize[i])
432 for i, k := range keys {
433 if size <= c.config.MaxCollectionBytes/2 {
437 // already removed this entry in the previous loop
440 c.collections.Remove(k)
441 size -= int64(entsize[i])
445 // collectionBytes returns the approximate combined memory size of the
446 // collection cache and session filesystem cache.
447 func (c *cache) collectionBytes() uint64 {
449 for _, k := range c.collections.Keys() {
450 v, ok := c.collections.Peek(k)
454 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
456 for _, token := range c.sessions.Keys() {
457 ent, ok := c.sessions.Peek(token)
461 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
462 size += uint64(fs.MemorySize())
468 func (c *cache) lookupCollection(key string) *arvados.Collection {
469 e, cached := c.collections.Get(key)
473 ent := e.(*cachedCollection)
474 if ent.expire.Before(time.Now()) {
475 c.collections.Remove(key)
478 c.metrics.collectionHits.Inc()
479 return ent.collection
482 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
483 // Get and cache user record associated with this
484 // token. We need to know their UUID for logging, and
485 // whether they are an admin or not for certain
486 // permission checks.
488 // Get/create session entry
489 _, sess, err := c.GetSession(token)
494 // See if the user is already set, and if so, return it
495 user, _ := sess.user.Load().(*arvados.User)
500 // Fetch the user record
501 c.metrics.apiCalls.Inc()
502 var current arvados.User
504 err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil)
509 // Stash the user record for next time
510 sess.user.Store(¤t)