1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
19 const metricsUpdateInterval = time.Second / 10
22 cluster *arvados.Cluster
23 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24 registry *prometheus.Registry
26 pdhs *lru.TwoQueueCache
27 collections *lru.TwoQueueCache
28 sessions *lru.TwoQueueCache
31 chPruneSessions chan struct{}
32 chPruneCollections chan struct{}
35 type cacheMetrics struct {
36 requests prometheus.Counter
37 collectionBytes prometheus.Gauge
38 collectionEntries prometheus.Gauge
39 sessionEntries prometheus.Gauge
40 collectionHits prometheus.Counter
41 pdhHits prometheus.Counter
42 sessionHits prometheus.Counter
43 sessionMisses prometheus.Counter
44 apiCalls prometheus.Counter
47 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
48 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
50 Subsystem: "keepweb_collectioncache",
52 Help: "Number of targetID-to-manifest lookups handled.",
54 reg.MustRegister(m.requests)
55 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
57 Subsystem: "keepweb_collectioncache",
59 Help: "Number of pdh-to-manifest cache hits.",
61 reg.MustRegister(m.collectionHits)
62 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
64 Subsystem: "keepweb_collectioncache",
66 Help: "Number of uuid-to-pdh cache hits.",
68 reg.MustRegister(m.pdhHits)
69 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
71 Subsystem: "keepweb_collectioncache",
73 Help: "Number of outgoing API calls made by cache.",
75 reg.MustRegister(m.apiCalls)
76 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
78 Subsystem: "keepweb_sessions",
79 Name: "cached_collection_bytes",
80 Help: "Total size of all cached manifests and sessions.",
82 reg.MustRegister(m.collectionBytes)
83 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
85 Subsystem: "keepweb_collectioncache",
86 Name: "cached_manifests",
87 Help: "Number of manifests in cache.",
89 reg.MustRegister(m.collectionEntries)
90 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
92 Subsystem: "keepweb_sessions",
94 Help: "Number of active token sessions.",
96 reg.MustRegister(m.sessionEntries)
97 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
99 Subsystem: "keepweb_sessions",
101 Help: "Number of token session cache hits.",
103 reg.MustRegister(m.sessionHits)
104 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
105 Namespace: "arvados",
106 Subsystem: "keepweb_sessions",
108 Help: "Number of token session cache misses.",
110 reg.MustRegister(m.sessionMisses)
113 type cachedPDH struct {
119 type cachedCollection struct {
121 collection *arvados.Collection
124 type cachedPermission struct {
128 type cachedSession struct {
131 client *arvados.Client
132 arvadosclient *arvadosclient.ArvadosClient
133 keepclient *keepclient.KeepClient
137 func (c *cache) setup() {
139 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
143 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
147 c.sessions, err = lru.New2Q(c.config.MaxSessions)
154 reg = prometheus.NewRegistry()
158 for range time.Tick(metricsUpdateInterval) {
162 c.chPruneCollections = make(chan struct{}, 1)
164 for range c.chPruneCollections {
168 c.chPruneSessions = make(chan struct{}, 1)
170 for range c.chPruneSessions {
176 func (c *cache) updateGauges() {
177 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
178 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
179 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
182 var selectPDH = map[string]interface{}{
183 "select": []string{"portable_data_hash"},
186 // Update saves a modified version (fs) to an existing collection
187 // (coll) and, if successful, updates the relevant cache entries so
188 // subsequent calls to Get() reflect the modifications.
189 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
190 c.setupOnce.Do(c.setup)
192 m, err := fs.MarshalManifest(".")
193 if err != nil || m == coll.ManifestText {
196 coll.ManifestText = m
197 var updated arvados.Collection
198 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
199 "collection": map[string]string{
200 "manifest_text": coll.ManifestText,
204 c.pdhs.Remove(coll.UUID)
207 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
208 expire: time.Now().Add(time.Duration(c.config.TTL)),
209 collection: &updated,
211 c.pdhs.Add(coll.UUID, &cachedPDH{
212 expire: time.Now().Add(time.Duration(c.config.TTL)),
213 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
214 pdh: updated.PortableDataHash,
219 // ResetSession unloads any potentially stale state. Should be called
220 // after write operations, so subsequent reads don't return stale
222 func (c *cache) ResetSession(token string) {
223 c.setupOnce.Do(c.setup)
224 c.sessions.Remove(token)
227 // Get a long-lived CustomFileSystem suitable for doing a read operation
228 // with the given token.
229 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
230 c.setupOnce.Do(c.setup)
232 ent, _ := c.sessions.Get(token)
233 sess, _ := ent.(*cachedSession)
236 c.metrics.sessionMisses.Inc()
237 sess = &cachedSession{
238 expire: now.Add(c.config.TTL.Duration()),
241 sess.client, err = arvados.NewClientFromConfig(c.cluster)
245 sess.client.AuthToken = token
246 sess.arvadosclient, err = arvadosclient.New(sess.client)
250 sess.keepclient = keepclient.New(sess.arvadosclient)
251 c.sessions.Add(token, sess)
252 } else if sess.expire.Before(now) {
253 c.metrics.sessionMisses.Inc()
256 c.metrics.sessionHits.Inc()
259 case c.chPruneSessions <- struct{}{}:
262 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
263 if fs != nil && !expired {
266 fs = sess.client.SiteFileSystem(sess.keepclient)
267 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
272 // Remove all expired session cache entries, then remove more entries
273 // until approximate remaining size <= maxsize/2
274 func (c *cache) pruneSessions() {
277 keys := c.sessions.Keys()
278 for _, token := range keys {
279 ent, ok := c.sessions.Peek(token)
283 s := ent.(*cachedSession)
284 if s.expire.Before(now) {
285 c.sessions.Remove(token)
288 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
289 size += fs.MemorySize()
292 // Remove tokens until reaching size limit, starting with the
293 // least frequently used entries (which Keys() returns last).
294 for i := len(keys) - 1; i >= 0; i-- {
296 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
299 ent, ok := c.sessions.Peek(token)
303 s := ent.(*cachedSession)
304 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
308 c.sessions.Remove(token)
309 size -= fs.MemorySize()
313 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
314 c.setupOnce.Do(c.setup)
315 c.metrics.requests.Inc()
319 if arvadosclient.PDHMatch(targetID) {
321 } else if ent, cached := c.pdhs.Get(targetID); cached {
322 ent := ent.(*cachedPDH)
323 if ent.expire.Before(time.Now()) {
324 c.pdhs.Remove(targetID)
327 pdhRefresh = forceReload || time.Now().After(ent.refresh)
328 c.metrics.pdhHits.Inc()
333 // UUID->PDH mapping is not cached, might as well get
334 // the whole collection record and be done (below).
335 } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
336 // PDH->manifest is not cached, might as well get the
337 // whole collection record (below).
338 } else if !pdhRefresh {
339 // We looked up UUID->PDH very recently, and we still
340 // have the manifest for that PDH.
343 // Get current PDH for this UUID (and confirm we still
344 // have read permission). Most likely, the cached PDH
345 // is still correct, in which case we can use our
347 c.metrics.apiCalls.Inc()
348 var current arvados.Collection
349 err := arv.Get("collections", targetID, selectPDH, ¤t)
353 if current.PortableDataHash == pdh {
354 // PDH has not changed, cached manifest is
358 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
359 // PDH changed, and we already have the
360 // manifest for that new PDH.
365 // Either UUID->PDH is not cached, or PDH->manifest is not
367 var retrieved arvados.Collection
368 c.metrics.apiCalls.Inc()
369 err := arv.Get("collections", targetID, nil, &retrieved)
373 exp := time.Now().Add(time.Duration(c.config.TTL))
374 if targetID != retrieved.PortableDataHash {
375 c.pdhs.Add(targetID, &cachedPDH{
377 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
378 pdh: retrieved.PortableDataHash,
381 c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
383 collection: &retrieved,
385 if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
387 case c.chPruneCollections <- struct{}{}:
391 return &retrieved, nil
394 // pruneCollections checks the total bytes occupied by manifest_text
395 // in the collection cache and removes old entries as needed to bring
396 // the total size down to CollectionBytes. It also deletes all expired
399 // pruneCollections does not aim to be perfectly correct when there is
400 // concurrent cache activity.
401 func (c *cache) pruneCollections() {
404 keys := c.collections.Keys()
405 entsize := make([]int, len(keys))
406 expired := make([]bool, len(keys))
407 for i, k := range keys {
408 v, ok := c.collections.Peek(k)
412 ent := v.(*cachedCollection)
413 n := len(ent.collection.ManifestText)
416 expired[i] = ent.expire.Before(now)
418 for i, k := range keys {
420 c.collections.Remove(k)
421 size -= int64(entsize[i])
424 for i, k := range keys {
425 if size <= c.config.MaxCollectionBytes/2 {
429 // already removed this entry in the previous loop
432 c.collections.Remove(k)
433 size -= int64(entsize[i])
437 // collectionBytes returns the approximate combined memory size of the
438 // collection cache and session filesystem cache.
439 func (c *cache) collectionBytes() uint64 {
441 for _, k := range c.collections.Keys() {
442 v, ok := c.collections.Peek(k)
446 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
448 for _, token := range c.sessions.Keys() {
449 ent, ok := c.sessions.Peek(token)
453 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
454 size += uint64(fs.MemorySize())
460 func (c *cache) lookupCollection(key string) *arvados.Collection {
461 e, cached := c.collections.Get(key)
465 ent := e.(*cachedCollection)
466 if ent.expire.Before(time.Now()) {
467 c.collections.Remove(key)
470 c.metrics.collectionHits.Inc()
471 return ent.collection
474 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
475 // Get and cache user record associated with this
476 // token. We need to know their UUID for logging, and
477 // whether they are an admin or not for certain
478 // permission checks.
480 // Get/create session entry
481 _, sess, err := c.GetSession(token)
486 // See if the user is already set, and if so, return it
487 user, _ := sess.user.Load().(*arvados.User)
492 // Fetch the user record
493 c.metrics.apiCalls.Inc()
494 var current arvados.User
496 err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil)
501 // Stash the user record for next time
502 sess.user.Store(¤t)