1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 const metricsUpdateInterval = time.Second / 10
23 cluster *arvados.Cluster
24 logger logrus.FieldLogger
25 registry *prometheus.Registry
27 pdhs *lru.TwoQueueCache
28 collections *lru.TwoQueueCache
29 sessions *lru.TwoQueueCache
32 chPruneSessions chan struct{}
33 chPruneCollections chan struct{}
36 type cacheMetrics struct {
37 requests prometheus.Counter
38 collectionBytes prometheus.Gauge
39 collectionEntries prometheus.Gauge
40 sessionEntries prometheus.Gauge
41 collectionHits prometheus.Counter
42 pdhHits prometheus.Counter
43 sessionHits prometheus.Counter
44 sessionMisses prometheus.Counter
45 apiCalls prometheus.Counter
48 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
49 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
51 Subsystem: "keepweb_collectioncache",
53 Help: "Number of targetID-to-manifest lookups handled.",
55 reg.MustRegister(m.requests)
56 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
58 Subsystem: "keepweb_collectioncache",
60 Help: "Number of pdh-to-manifest cache hits.",
62 reg.MustRegister(m.collectionHits)
63 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
65 Subsystem: "keepweb_collectioncache",
67 Help: "Number of uuid-to-pdh cache hits.",
69 reg.MustRegister(m.pdhHits)
70 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
72 Subsystem: "keepweb_collectioncache",
74 Help: "Number of outgoing API calls made by cache.",
76 reg.MustRegister(m.apiCalls)
77 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
79 Subsystem: "keepweb_sessions",
80 Name: "cached_collection_bytes",
81 Help: "Total size of all cached manifests and sessions.",
83 reg.MustRegister(m.collectionBytes)
84 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
86 Subsystem: "keepweb_collectioncache",
87 Name: "cached_manifests",
88 Help: "Number of manifests in cache.",
90 reg.MustRegister(m.collectionEntries)
91 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
93 Subsystem: "keepweb_sessions",
95 Help: "Number of active token sessions.",
97 reg.MustRegister(m.sessionEntries)
98 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100 Subsystem: "keepweb_sessions",
102 Help: "Number of token session cache hits.",
104 reg.MustRegister(m.sessionHits)
105 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
106 Namespace: "arvados",
107 Subsystem: "keepweb_sessions",
109 Help: "Number of token session cache misses.",
111 reg.MustRegister(m.sessionMisses)
114 type cachedPDH struct {
120 type cachedCollection struct {
122 collection *arvados.Collection
125 type cachedPermission struct {
129 type cachedSession struct {
132 client *arvados.Client
133 arvadosclient *arvadosclient.ArvadosClient
134 keepclient *keepclient.KeepClient
138 func (c *cache) setup() {
140 c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
144 c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
148 c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
155 reg = prometheus.NewRegistry()
159 for range time.Tick(metricsUpdateInterval) {
163 c.chPruneCollections = make(chan struct{}, 1)
165 for range c.chPruneCollections {
169 c.chPruneSessions = make(chan struct{}, 1)
171 for range c.chPruneSessions {
177 func (c *cache) updateGauges() {
178 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
179 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
180 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
183 var selectPDH = map[string]interface{}{
184 "select": []string{"portable_data_hash"},
187 // Update saves a modified version (fs) to an existing collection
188 // (coll) and, if successful, updates the relevant cache entries so
189 // subsequent calls to Get() reflect the modifications.
190 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
191 c.setupOnce.Do(c.setup)
193 m, err := fs.MarshalManifest(".")
194 if err != nil || m == coll.ManifestText {
197 coll.ManifestText = m
198 var updated arvados.Collection
199 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
200 "collection": map[string]string{
201 "manifest_text": coll.ManifestText,
205 c.pdhs.Remove(coll.UUID)
208 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
209 expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
210 collection: &updated,
212 c.pdhs.Add(coll.UUID, &cachedPDH{
213 expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
214 refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
215 pdh: updated.PortableDataHash,
220 // ResetSession unloads any potentially stale state. Should be called
221 // after write operations, so subsequent reads don't return stale
223 func (c *cache) ResetSession(token string) {
224 c.setupOnce.Do(c.setup)
225 c.sessions.Remove(token)
228 // Get a long-lived CustomFileSystem suitable for doing a read operation
229 // with the given token.
230 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
231 c.setupOnce.Do(c.setup)
233 ent, _ := c.sessions.Get(token)
234 sess, _ := ent.(*cachedSession)
237 c.metrics.sessionMisses.Inc()
238 sess = &cachedSession{
239 expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
242 sess.client, err = arvados.NewClientFromConfig(c.cluster)
246 sess.client.AuthToken = token
247 sess.arvadosclient, err = arvadosclient.New(sess.client)
251 sess.keepclient = keepclient.New(sess.arvadosclient)
252 c.sessions.Add(token, sess)
253 } else if sess.expire.Before(now) {
254 c.metrics.sessionMisses.Inc()
257 c.metrics.sessionHits.Inc()
260 case c.chPruneSessions <- struct{}{}:
263 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
264 if fs != nil && !expired {
267 fs = sess.client.SiteFileSystem(sess.keepclient)
268 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
273 // Remove all expired session cache entries, then remove more entries
274 // until approximate remaining size <= maxsize/2
275 func (c *cache) pruneSessions() {
278 keys := c.sessions.Keys()
279 for _, token := range keys {
280 ent, ok := c.sessions.Peek(token)
284 s := ent.(*cachedSession)
285 if s.expire.Before(now) {
286 c.sessions.Remove(token)
289 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
290 size += fs.MemorySize()
293 // Remove tokens until reaching size limit, starting with the
294 // least frequently used entries (which Keys() returns last).
295 for i := len(keys) - 1; i >= 0; i-- {
297 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
300 ent, ok := c.sessions.Peek(token)
304 s := ent.(*cachedSession)
305 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
309 c.sessions.Remove(token)
310 size -= fs.MemorySize()
314 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
315 c.setupOnce.Do(c.setup)
316 c.metrics.requests.Inc()
320 if arvadosclient.PDHMatch(targetID) {
322 } else if ent, cached := c.pdhs.Get(targetID); cached {
323 ent := ent.(*cachedPDH)
324 if ent.expire.Before(time.Now()) {
325 c.pdhs.Remove(targetID)
328 pdhRefresh = forceReload || time.Now().After(ent.refresh)
329 c.metrics.pdhHits.Inc()
334 // UUID->PDH mapping is not cached, might as well get
335 // the whole collection record and be done (below).
336 c.logger.Debugf("cache(%s): have no pdh", targetID)
337 } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
338 // PDH->manifest is not cached, might as well get the
339 // whole collection record (below).
340 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
341 } else if !pdhRefresh {
342 // We looked up UUID->PDH very recently, and we still
343 // have the manifest for that PDH.
344 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
347 // Get current PDH for this UUID (and confirm we still
348 // have read permission). Most likely, the cached PDH
349 // is still correct, in which case we can use our
351 c.metrics.apiCalls.Inc()
352 var current arvados.Collection
353 err := arv.Get("collections", targetID, selectPDH, ¤t)
357 if current.PortableDataHash == pdh {
358 // PDH has not changed, cached manifest is
360 c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
363 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
364 // PDH changed, and we already have the
365 // manifest for that new PDH.
366 c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
371 // Either UUID->PDH is not cached, or PDH->manifest is not
373 var retrieved arvados.Collection
374 c.metrics.apiCalls.Inc()
375 err := arv.Get("collections", targetID, nil, &retrieved)
379 c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
380 exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
381 if targetID != retrieved.PortableDataHash {
382 c.pdhs.Add(targetID, &cachedPDH{
384 refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
385 pdh: retrieved.PortableDataHash,
388 c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
390 collection: &retrieved,
392 if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
394 case c.chPruneCollections <- struct{}{}:
398 return &retrieved, nil
401 // pruneCollections checks the total bytes occupied by manifest_text
402 // in the collection cache and removes old entries as needed to bring
403 // the total size down to CollectionBytes. It also deletes all expired
406 // pruneCollections does not aim to be perfectly correct when there is
407 // concurrent cache activity.
408 func (c *cache) pruneCollections() {
411 keys := c.collections.Keys()
412 entsize := make([]int, len(keys))
413 expired := make([]bool, len(keys))
414 for i, k := range keys {
415 v, ok := c.collections.Peek(k)
419 ent := v.(*cachedCollection)
420 n := len(ent.collection.ManifestText)
423 expired[i] = ent.expire.Before(now)
425 for i, k := range keys {
427 c.collections.Remove(k)
428 size -= int64(entsize[i])
431 for i, k := range keys {
432 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
436 // already removed this entry in the previous loop
439 c.collections.Remove(k)
440 size -= int64(entsize[i])
444 // collectionBytes returns the approximate combined memory size of the
445 // collection cache and session filesystem cache.
446 func (c *cache) collectionBytes() uint64 {
448 for _, k := range c.collections.Keys() {
449 v, ok := c.collections.Peek(k)
453 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
455 for _, token := range c.sessions.Keys() {
456 ent, ok := c.sessions.Peek(token)
460 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
461 size += uint64(fs.MemorySize())
467 func (c *cache) lookupCollection(key string) *arvados.Collection {
468 e, cached := c.collections.Get(key)
472 ent := e.(*cachedCollection)
473 if ent.expire.Before(time.Now()) {
474 c.collections.Remove(key)
477 c.metrics.collectionHits.Inc()
478 return ent.collection
481 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
482 // Get and cache user record associated with this
483 // token. We need to know their UUID for logging, and
484 // whether they are an admin or not for certain
485 // permission checks.
487 // Get/create session entry
488 _, sess, err := c.GetSession(token)
493 // See if the user is already set, and if so, return it
494 user, _ := sess.user.Load().(*arvados.User)
499 // Fetch the user record
500 c.metrics.apiCalls.Inc()
501 var current arvados.User
503 err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil)
508 // Stash the user record for next time
509 sess.user.Store(¤t)