1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
19 const metricsUpdateInterval = time.Second / 10
22 cluster *arvados.Cluster
23 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24 registry *prometheus.Registry
26 pdhs *lru.TwoQueueCache
27 collections *lru.TwoQueueCache
28 permissions *lru.TwoQueueCache
29 sessions *lru.TwoQueueCache
33 type cacheMetrics struct {
34 requests prometheus.Counter
35 collectionBytes prometheus.Gauge
36 collectionEntries prometheus.Gauge
37 sessionEntries prometheus.Gauge
38 collectionHits prometheus.Counter
39 pdhHits prometheus.Counter
40 permissionHits prometheus.Counter
41 sessionHits prometheus.Counter
42 sessionMisses prometheus.Counter
43 apiCalls prometheus.Counter
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
49 Subsystem: "keepweb_collectioncache",
51 Help: "Number of targetID-to-manifest lookups handled.",
53 reg.MustRegister(m.requests)
54 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
56 Subsystem: "keepweb_collectioncache",
58 Help: "Number of pdh-to-manifest cache hits.",
60 reg.MustRegister(m.collectionHits)
61 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
63 Subsystem: "keepweb_collectioncache",
65 Help: "Number of uuid-to-pdh cache hits.",
67 reg.MustRegister(m.pdhHits)
68 m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
70 Subsystem: "keepweb_collectioncache",
71 Name: "permission_hits",
72 Help: "Number of targetID-to-permission cache hits.",
74 reg.MustRegister(m.permissionHits)
75 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
77 Subsystem: "keepweb_collectioncache",
79 Help: "Number of outgoing API calls made by cache.",
81 reg.MustRegister(m.apiCalls)
82 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
84 Subsystem: "keepweb_collectioncache",
85 Name: "cached_manifest_bytes",
86 Help: "Total size of all cached manifests and sessions.",
88 reg.MustRegister(m.collectionBytes)
89 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
91 Subsystem: "keepweb_collectioncache",
92 Name: "cached_manifests",
93 Help: "Number of manifests in cache.",
95 reg.MustRegister(m.collectionEntries)
96 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
98 Subsystem: "keepweb_sessions",
100 Help: "Number of active token sessions.",
102 reg.MustRegister(m.sessionEntries)
103 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104 Namespace: "arvados",
105 Subsystem: "keepweb_sessions",
107 Help: "Number of token session cache hits.",
109 reg.MustRegister(m.sessionHits)
110 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111 Namespace: "arvados",
112 Subsystem: "keepweb_sessions",
114 Help: "Number of token session cache misses.",
116 reg.MustRegister(m.sessionMisses)
119 type cachedPDH struct {
124 type cachedCollection struct {
126 collection *arvados.Collection
129 type cachedPermission struct {
133 type cachedSession struct {
138 func (c *cache) setup() {
140 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
144 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
148 c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
152 c.sessions, err = lru.New2Q(c.config.MaxSessions)
159 reg = prometheus.NewRegistry()
163 for range time.Tick(metricsUpdateInterval) {
169 func (c *cache) updateGauges() {
170 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
171 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
172 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
175 var selectPDH = map[string]interface{}{
176 "select": []string{"portable_data_hash"},
179 // Update saves a modified version (fs) to an existing collection
180 // (coll) and, if successful, updates the relevant cache entries so
181 // subsequent calls to Get() reflect the modifications.
182 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
183 c.setupOnce.Do(c.setup)
185 m, err := fs.MarshalManifest(".")
186 if err != nil || m == coll.ManifestText {
189 coll.ManifestText = m
190 var updated arvados.Collection
191 defer c.pdhs.Remove(coll.UUID)
192 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
193 "collection": map[string]string{
194 "manifest_text": coll.ManifestText,
198 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
199 expire: time.Now().Add(time.Duration(c.config.TTL)),
200 collection: &updated,
206 // ResetSession unloads any potentially stale state. Should be called
207 // after write operations, so subsequent reads don't return stale
209 func (c *cache) ResetSession(token string) {
210 c.setupOnce.Do(c.setup)
211 c.sessions.Remove(token)
214 // Get a long-lived CustomFileSystem suitable for doing a read operation
215 // with the given token.
216 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
217 c.setupOnce.Do(c.setup)
219 ent, _ := c.sessions.Get(token)
220 sess, _ := ent.(*cachedSession)
222 c.metrics.sessionMisses.Inc()
223 sess = &cachedSession{
224 expire: now.Add(c.config.TTL.Duration()),
226 c.sessions.Add(token, sess)
227 } else if sess.expire.Before(now) {
228 c.metrics.sessionMisses.Inc()
231 c.metrics.sessionHits.Inc()
234 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
238 ac, err := arvados.NewClientFromConfig(c.cluster)
243 arv, err := arvadosclient.New(ac)
247 kc := keepclient.New(arv)
248 fs = ac.SiteFileSystem(kc)
249 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
254 // Remove all expired session cache entries, then remove more entries
255 // until approximate remaining size <= maxsize/2
256 func (c *cache) pruneSessions() {
259 keys := c.sessions.Keys()
260 for _, token := range keys {
261 ent, ok := c.sessions.Peek(token)
265 s := ent.(*cachedSession)
266 if s.expire.Before(now) {
267 c.sessions.Remove(token)
270 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
271 size += fs.MemorySize()
274 // Remove tokens until reaching size limit, starting with the
275 // least frequently used entries (which Keys() returns last).
276 for i := len(keys) - 1; i >= 0; i-- {
278 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
281 ent, ok := c.sessions.Peek(token)
285 s := ent.(*cachedSession)
286 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
290 c.sessions.Remove(token)
291 size -= fs.MemorySize()
295 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
296 c.setupOnce.Do(c.setup)
297 c.metrics.requests.Inc()
300 permKey := arv.ApiToken + "\000" + targetID
302 } else if ent, cached := c.permissions.Get(permKey); cached {
303 ent := ent.(*cachedPermission)
304 if ent.expire.Before(time.Now()) {
305 c.permissions.Remove(permKey)
308 c.metrics.permissionHits.Inc()
313 if arvadosclient.PDHMatch(targetID) {
315 } else if ent, cached := c.pdhs.Get(targetID); cached {
316 ent := ent.(*cachedPDH)
317 if ent.expire.Before(time.Now()) {
318 c.pdhs.Remove(targetID)
321 c.metrics.pdhHits.Inc()
325 var collection *arvados.Collection
327 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
330 if collection != nil && permOK {
331 return collection, nil
332 } else if collection != nil {
333 // Ask API for current PDH for this targetID. Most
334 // likely, the cached PDH is still correct; if so,
335 // _and_ the current token has permission, we can
336 // use our cached manifest.
337 c.metrics.apiCalls.Inc()
338 var current arvados.Collection
339 err := arv.Get("collections", targetID, selectPDH, ¤t)
343 if current.PortableDataHash == pdh {
344 c.permissions.Add(permKey, &cachedPermission{
345 expire: time.Now().Add(time.Duration(c.config.TTL)),
348 c.pdhs.Add(targetID, &cachedPDH{
349 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
353 return collection, err
355 // PDH changed, but now we know we have
356 // permission -- and maybe we already have the
357 // new PDH in the cache.
358 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
363 // Collection manifest is not cached.
364 c.metrics.apiCalls.Inc()
365 err := arv.Get("collections", targetID, nil, &collection)
369 exp := time.Now().Add(time.Duration(c.config.TTL))
370 c.permissions.Add(permKey, &cachedPermission{
373 c.pdhs.Add(targetID, &cachedPDH{
374 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
375 pdh: collection.PortableDataHash,
377 c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
379 collection: collection,
381 if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
382 go c.pruneCollections()
384 return collection, nil
387 // pruneCollections checks the total bytes occupied by manifest_text
388 // in the collection cache and removes old entries as needed to bring
389 // the total size down to CollectionBytes. It also deletes all expired
392 // pruneCollections does not aim to be perfectly correct when there is
393 // concurrent cache activity.
394 func (c *cache) pruneCollections() {
397 keys := c.collections.Keys()
398 entsize := make([]int, len(keys))
399 expired := make([]bool, len(keys))
400 for i, k := range keys {
401 v, ok := c.collections.Peek(k)
405 ent := v.(*cachedCollection)
406 n := len(ent.collection.ManifestText)
409 expired[i] = ent.expire.Before(now)
411 for i, k := range keys {
413 c.collections.Remove(k)
414 size -= int64(entsize[i])
417 for i, k := range keys {
418 if size <= c.config.MaxCollectionBytes/2 {
422 // already removed this entry in the previous loop
425 c.collections.Remove(k)
426 size -= int64(entsize[i])
430 // collectionBytes returns the approximate combined memory size of the
431 // collection cache and session filesystem cache.
432 func (c *cache) collectionBytes() uint64 {
434 for _, k := range c.collections.Keys() {
435 v, ok := c.collections.Peek(k)
439 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
441 for _, token := range c.sessions.Keys() {
442 ent, ok := c.sessions.Peek(token)
446 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
447 size += uint64(fs.MemorySize())
453 func (c *cache) lookupCollection(key string) *arvados.Collection {
454 e, cached := c.collections.Get(key)
458 ent := e.(*cachedCollection)
459 if ent.expire.Before(time.Now()) {
460 c.collections.Remove(key)
463 c.metrics.collectionHits.Inc()
464 return ent.collection