1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
19 const metricsUpdateInterval = time.Second / 10
22 cluster *arvados.Cluster
23 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24 registry *prometheus.Registry
26 pdhs *lru.TwoQueueCache
27 collections *lru.TwoQueueCache
28 permissions *lru.TwoQueueCache
29 sessions *lru.TwoQueueCache
33 type cacheMetrics struct {
34 requests prometheus.Counter
35 collectionBytes prometheus.Gauge
36 collectionEntries prometheus.Gauge
37 sessionEntries prometheus.Gauge
38 collectionHits prometheus.Counter
39 pdhHits prometheus.Counter
40 permissionHits prometheus.Counter
41 sessionHits prometheus.Counter
42 sessionMisses prometheus.Counter
43 apiCalls prometheus.Counter
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
49 Subsystem: "keepweb_collectioncache",
51 Help: "Number of targetID-to-manifest lookups handled.",
53 reg.MustRegister(m.requests)
54 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
56 Subsystem: "keepweb_collectioncache",
58 Help: "Number of pdh-to-manifest cache hits.",
60 reg.MustRegister(m.collectionHits)
61 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
63 Subsystem: "keepweb_collectioncache",
65 Help: "Number of uuid-to-pdh cache hits.",
67 reg.MustRegister(m.pdhHits)
68 m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
70 Subsystem: "keepweb_collectioncache",
71 Name: "permission_hits",
72 Help: "Number of targetID-to-permission cache hits.",
74 reg.MustRegister(m.permissionHits)
75 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
77 Subsystem: "keepweb_collectioncache",
79 Help: "Number of outgoing API calls made by cache.",
81 reg.MustRegister(m.apiCalls)
82 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
84 Subsystem: "keepweb_sessions",
85 Name: "cached_collection_bytes",
86 Help: "Total size of all cached manifests and sessions.",
88 reg.MustRegister(m.collectionBytes)
89 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
91 Subsystem: "keepweb_collectioncache",
92 Name: "cached_manifests",
93 Help: "Number of manifests in cache.",
95 reg.MustRegister(m.collectionEntries)
96 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
98 Subsystem: "keepweb_sessions",
100 Help: "Number of active token sessions.",
102 reg.MustRegister(m.sessionEntries)
103 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104 Namespace: "arvados",
105 Subsystem: "keepweb_sessions",
107 Help: "Number of token session cache hits.",
109 reg.MustRegister(m.sessionHits)
110 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111 Namespace: "arvados",
112 Subsystem: "keepweb_sessions",
114 Help: "Number of token session cache misses.",
116 reg.MustRegister(m.sessionMisses)
119 type cachedPDH struct {
124 type cachedCollection struct {
126 collection *arvados.Collection
129 type cachedPermission struct {
133 type cachedSession struct {
136 client *arvados.Client
137 arvadosclient *arvadosclient.ArvadosClient
138 keepclient *keepclient.KeepClient
142 func (c *cache) setup() {
144 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
148 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
152 c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
156 c.sessions, err = lru.New2Q(c.config.MaxSessions)
163 reg = prometheus.NewRegistry()
167 for range time.Tick(metricsUpdateInterval) {
173 func (c *cache) updateGauges() {
174 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
175 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
176 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
179 var selectPDH = map[string]interface{}{
180 "select": []string{"portable_data_hash"},
183 // Update saves a modified version (fs) to an existing collection
184 // (coll) and, if successful, updates the relevant cache entries so
185 // subsequent calls to Get() reflect the modifications.
186 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
187 c.setupOnce.Do(c.setup)
189 m, err := fs.MarshalManifest(".")
190 if err != nil || m == coll.ManifestText {
193 coll.ManifestText = m
194 var updated arvados.Collection
195 defer c.pdhs.Remove(coll.UUID)
196 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
197 "collection": map[string]string{
198 "manifest_text": coll.ManifestText,
202 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
203 expire: time.Now().Add(time.Duration(c.config.TTL)),
204 collection: &updated,
210 // ResetSession unloads any potentially stale state. Should be called
211 // after write operations, so subsequent reads don't return stale
213 func (c *cache) ResetSession(token string) {
214 c.setupOnce.Do(c.setup)
215 c.sessions.Remove(token)
218 // Get a long-lived CustomFileSystem suitable for doing a read operation
219 // with the given token.
220 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
221 c.setupOnce.Do(c.setup)
223 ent, _ := c.sessions.Get(token)
224 sess, _ := ent.(*cachedSession)
227 c.metrics.sessionMisses.Inc()
228 sess = &cachedSession{
229 expire: now.Add(c.config.TTL.Duration()),
232 sess.client, err = arvados.NewClientFromConfig(c.cluster)
236 sess.client.AuthToken = token
237 sess.arvadosclient, err = arvadosclient.New(sess.client)
241 sess.keepclient = keepclient.New(sess.arvadosclient)
242 c.sessions.Add(token, sess)
243 } else if sess.expire.Before(now) {
244 c.metrics.sessionMisses.Inc()
247 c.metrics.sessionHits.Inc()
250 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
251 if fs != nil && !expired {
254 fs = sess.client.SiteFileSystem(sess.keepclient)
255 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
260 // Remove all expired session cache entries, then remove more entries
261 // until approximate remaining size <= maxsize/2
262 func (c *cache) pruneSessions() {
265 keys := c.sessions.Keys()
266 for _, token := range keys {
267 ent, ok := c.sessions.Peek(token)
271 s := ent.(*cachedSession)
272 if s.expire.Before(now) {
273 c.sessions.Remove(token)
276 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
277 size += fs.MemorySize()
280 // Remove tokens until reaching size limit, starting with the
281 // least frequently used entries (which Keys() returns last).
282 for i := len(keys) - 1; i >= 0; i-- {
284 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
287 ent, ok := c.sessions.Peek(token)
291 s := ent.(*cachedSession)
292 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
296 c.sessions.Remove(token)
297 size -= fs.MemorySize()
301 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
302 c.setupOnce.Do(c.setup)
303 c.metrics.requests.Inc()
306 permKey := arv.ApiToken + "\000" + targetID
308 } else if ent, cached := c.permissions.Get(permKey); cached {
309 ent := ent.(*cachedPermission)
310 if ent.expire.Before(time.Now()) {
311 c.permissions.Remove(permKey)
314 c.metrics.permissionHits.Inc()
319 if arvadosclient.PDHMatch(targetID) {
321 } else if ent, cached := c.pdhs.Get(targetID); cached {
322 ent := ent.(*cachedPDH)
323 if ent.expire.Before(time.Now()) {
324 c.pdhs.Remove(targetID)
327 c.metrics.pdhHits.Inc()
331 var collection *arvados.Collection
333 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
336 if collection != nil && permOK {
337 return collection, nil
338 } else if collection != nil {
339 // Ask API for current PDH for this targetID. Most
340 // likely, the cached PDH is still correct; if so,
341 // _and_ the current token has permission, we can
342 // use our cached manifest.
343 c.metrics.apiCalls.Inc()
344 var current arvados.Collection
345 err := arv.Get("collections", targetID, selectPDH, ¤t)
349 if current.PortableDataHash == pdh {
350 c.permissions.Add(permKey, &cachedPermission{
351 expire: time.Now().Add(time.Duration(c.config.TTL)),
354 c.pdhs.Add(targetID, &cachedPDH{
355 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
359 return collection, err
361 // PDH changed, but now we know we have
362 // permission -- and maybe we already have the
363 // new PDH in the cache.
364 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
369 // Collection manifest is not cached.
370 c.metrics.apiCalls.Inc()
371 err := arv.Get("collections", targetID, nil, &collection)
375 exp := time.Now().Add(time.Duration(c.config.TTL))
376 c.permissions.Add(permKey, &cachedPermission{
379 c.pdhs.Add(targetID, &cachedPDH{
380 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
381 pdh: collection.PortableDataHash,
383 c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
385 collection: collection,
387 if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
388 go c.pruneCollections()
390 return collection, nil
393 // pruneCollections checks the total bytes occupied by manifest_text
394 // in the collection cache and removes old entries as needed to bring
395 // the total size down to CollectionBytes. It also deletes all expired
398 // pruneCollections does not aim to be perfectly correct when there is
399 // concurrent cache activity.
400 func (c *cache) pruneCollections() {
403 keys := c.collections.Keys()
404 entsize := make([]int, len(keys))
405 expired := make([]bool, len(keys))
406 for i, k := range keys {
407 v, ok := c.collections.Peek(k)
411 ent := v.(*cachedCollection)
412 n := len(ent.collection.ManifestText)
415 expired[i] = ent.expire.Before(now)
417 for i, k := range keys {
419 c.collections.Remove(k)
420 size -= int64(entsize[i])
423 for i, k := range keys {
424 if size <= c.config.MaxCollectionBytes/2 {
428 // already removed this entry in the previous loop
431 c.collections.Remove(k)
432 size -= int64(entsize[i])
436 // collectionBytes returns the approximate combined memory size of the
437 // collection cache and session filesystem cache.
438 func (c *cache) collectionBytes() uint64 {
440 for _, k := range c.collections.Keys() {
441 v, ok := c.collections.Peek(k)
445 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
447 for _, token := range c.sessions.Keys() {
448 ent, ok := c.sessions.Peek(token)
452 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
453 size += uint64(fs.MemorySize())
459 func (c *cache) lookupCollection(key string) *arvados.Collection {
460 e, cached := c.collections.Get(key)
464 ent := e.(*cachedCollection)
465 if ent.expire.Before(time.Now()) {
466 c.collections.Remove(key)
469 c.metrics.collectionHits.Inc()
470 return ent.collection
473 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
474 // Get and cache user record associated with this
475 // token. We need to know their UUID for logging, and
476 // whether they are an admin or not for certain
477 // permission checks.
479 // Get/create session entry
480 _, sess, err := c.GetSession(token)
485 // See if the user is already set, and if so, return it
486 user, _ := sess.user.Load().(*arvados.User)
491 // Fetch the user record
492 c.metrics.apiCalls.Inc()
493 var current arvados.User
495 err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil)
500 // Stash the user record for next time
501 sess.user.Store(¤t)