1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
19 const metricsUpdateInterval = time.Second / 10
22 cluster *arvados.Cluster
23 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24 registry *prometheus.Registry
26 pdhs *lru.TwoQueueCache
27 collections *lru.TwoQueueCache
28 permissions *lru.TwoQueueCache
29 sessions *lru.TwoQueueCache
33 type cacheMetrics struct {
34 requests prometheus.Counter
35 collectionBytes prometheus.Gauge
36 collectionEntries prometheus.Gauge
37 sessionEntries prometheus.Gauge
38 collectionHits prometheus.Counter
39 pdhHits prometheus.Counter
40 permissionHits prometheus.Counter
41 sessionHits prometheus.Counter
42 sessionMisses prometheus.Counter
43 apiCalls prometheus.Counter
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
49 Subsystem: "keepweb_collectioncache",
51 Help: "Number of targetID-to-manifest lookups handled.",
53 reg.MustRegister(m.requests)
54 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
56 Subsystem: "keepweb_collectioncache",
58 Help: "Number of pdh-to-manifest cache hits.",
60 reg.MustRegister(m.collectionHits)
61 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
63 Subsystem: "keepweb_collectioncache",
65 Help: "Number of uuid-to-pdh cache hits.",
67 reg.MustRegister(m.pdhHits)
68 m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
70 Subsystem: "keepweb_collectioncache",
71 Name: "permission_hits",
72 Help: "Number of targetID-to-permission cache hits.",
74 reg.MustRegister(m.permissionHits)
75 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
77 Subsystem: "keepweb_collectioncache",
79 Help: "Number of outgoing API calls made by cache.",
81 reg.MustRegister(m.apiCalls)
82 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
84 Subsystem: "keepweb_sessions",
85 Name: "cached_collection_bytes",
86 Help: "Total size of all cached manifests and sessions.",
88 reg.MustRegister(m.collectionBytes)
89 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
91 Subsystem: "keepweb_collectioncache",
92 Name: "cached_manifests",
93 Help: "Number of manifests in cache.",
95 reg.MustRegister(m.collectionEntries)
96 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
98 Subsystem: "keepweb_sessions",
100 Help: "Number of active token sessions.",
102 reg.MustRegister(m.sessionEntries)
103 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104 Namespace: "arvados",
105 Subsystem: "keepweb_sessions",
107 Help: "Number of token session cache hits.",
109 reg.MustRegister(m.sessionHits)
110 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111 Namespace: "arvados",
112 Subsystem: "keepweb_sessions",
114 Help: "Number of token session cache misses.",
116 reg.MustRegister(m.sessionMisses)
119 type cachedPDH struct {
124 type cachedCollection struct {
126 collection *arvados.Collection
129 type cachedPermission struct {
133 type cachedSession struct {
138 func (c *cache) setup() {
140 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
144 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
148 c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
152 c.sessions, err = lru.New2Q(c.config.MaxSessions)
159 reg = prometheus.NewRegistry()
163 for range time.Tick(metricsUpdateInterval) {
169 func (c *cache) updateGauges() {
170 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
171 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
172 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
175 var selectPDH = map[string]interface{}{
176 "select": []string{"portable_data_hash"},
179 // Update saves a modified version (fs) to an existing collection
180 // (coll) and, if successful, updates the relevant cache entries so
181 // subsequent calls to Get() reflect the modifications.
182 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
183 c.setupOnce.Do(c.setup)
185 m, err := fs.MarshalManifest(".")
186 if err != nil || m == coll.ManifestText {
189 coll.ManifestText = m
190 var updated arvados.Collection
191 defer c.pdhs.Remove(coll.UUID)
192 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
193 "collection": map[string]string{
194 "manifest_text": coll.ManifestText,
198 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
199 expire: time.Now().Add(time.Duration(c.config.TTL)),
200 collection: &updated,
206 // ResetSession unloads any potentially stale state. Should be called
207 // after write operations, so subsequent reads don't return stale
209 func (c *cache) ResetSession(token string) {
210 c.setupOnce.Do(c.setup)
211 c.sessions.Remove(token)
214 // Get a long-lived CustomFileSystem suitable for doing a read operation
215 // with the given token.
216 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
217 c.setupOnce.Do(c.setup)
219 ent, _ := c.sessions.Get(token)
220 sess, _ := ent.(*cachedSession)
223 c.metrics.sessionMisses.Inc()
224 sess = &cachedSession{
225 expire: now.Add(c.config.TTL.Duration()),
227 c.sessions.Add(token, sess)
228 } else if sess.expire.Before(now) {
229 c.metrics.sessionMisses.Inc()
232 c.metrics.sessionHits.Inc()
235 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
236 if fs != nil && !expired {
239 ac, err := arvados.NewClientFromConfig(c.cluster)
244 arv, err := arvadosclient.New(ac)
248 kc := keepclient.New(arv)
249 fs = ac.SiteFileSystem(kc)
250 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
255 // Remove all expired session cache entries, then remove more entries
256 // until approximate remaining size <= maxsize/2
257 func (c *cache) pruneSessions() {
260 keys := c.sessions.Keys()
261 for _, token := range keys {
262 ent, ok := c.sessions.Peek(token)
266 s := ent.(*cachedSession)
267 if s.expire.Before(now) {
268 c.sessions.Remove(token)
271 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
272 size += fs.MemorySize()
275 // Remove tokens until reaching size limit, starting with the
276 // least frequently used entries (which Keys() returns last).
277 for i := len(keys) - 1; i >= 0; i-- {
279 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
282 ent, ok := c.sessions.Peek(token)
286 s := ent.(*cachedSession)
287 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
291 c.sessions.Remove(token)
292 size -= fs.MemorySize()
296 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
297 c.setupOnce.Do(c.setup)
298 c.metrics.requests.Inc()
301 permKey := arv.ApiToken + "\000" + targetID
303 } else if ent, cached := c.permissions.Get(permKey); cached {
304 ent := ent.(*cachedPermission)
305 if ent.expire.Before(time.Now()) {
306 c.permissions.Remove(permKey)
309 c.metrics.permissionHits.Inc()
314 if arvadosclient.PDHMatch(targetID) {
316 } else if ent, cached := c.pdhs.Get(targetID); cached {
317 ent := ent.(*cachedPDH)
318 if ent.expire.Before(time.Now()) {
319 c.pdhs.Remove(targetID)
322 c.metrics.pdhHits.Inc()
326 var collection *arvados.Collection
328 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
331 if collection != nil && permOK {
332 return collection, nil
333 } else if collection != nil {
334 // Ask API for current PDH for this targetID. Most
335 // likely, the cached PDH is still correct; if so,
336 // _and_ the current token has permission, we can
337 // use our cached manifest.
338 c.metrics.apiCalls.Inc()
339 var current arvados.Collection
340 err := arv.Get("collections", targetID, selectPDH, ¤t)
344 if current.PortableDataHash == pdh {
345 c.permissions.Add(permKey, &cachedPermission{
346 expire: time.Now().Add(time.Duration(c.config.TTL)),
349 c.pdhs.Add(targetID, &cachedPDH{
350 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
354 return collection, err
356 // PDH changed, but now we know we have
357 // permission -- and maybe we already have the
358 // new PDH in the cache.
359 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
364 // Collection manifest is not cached.
365 c.metrics.apiCalls.Inc()
366 err := arv.Get("collections", targetID, nil, &collection)
370 exp := time.Now().Add(time.Duration(c.config.TTL))
371 c.permissions.Add(permKey, &cachedPermission{
374 c.pdhs.Add(targetID, &cachedPDH{
375 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
376 pdh: collection.PortableDataHash,
378 c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
380 collection: collection,
382 if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
383 go c.pruneCollections()
385 return collection, nil
388 // pruneCollections checks the total bytes occupied by manifest_text
389 // in the collection cache and removes old entries as needed to bring
390 // the total size down to CollectionBytes. It also deletes all expired
393 // pruneCollections does not aim to be perfectly correct when there is
394 // concurrent cache activity.
395 func (c *cache) pruneCollections() {
398 keys := c.collections.Keys()
399 entsize := make([]int, len(keys))
400 expired := make([]bool, len(keys))
401 for i, k := range keys {
402 v, ok := c.collections.Peek(k)
406 ent := v.(*cachedCollection)
407 n := len(ent.collection.ManifestText)
410 expired[i] = ent.expire.Before(now)
412 for i, k := range keys {
414 c.collections.Remove(k)
415 size -= int64(entsize[i])
418 for i, k := range keys {
419 if size <= c.config.MaxCollectionBytes/2 {
423 // already removed this entry in the previous loop
426 c.collections.Remove(k)
427 size -= int64(entsize[i])
431 // collectionBytes returns the approximate combined memory size of the
432 // collection cache and session filesystem cache.
433 func (c *cache) collectionBytes() uint64 {
435 for _, k := range c.collections.Keys() {
436 v, ok := c.collections.Peek(k)
440 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
442 for _, token := range c.sessions.Keys() {
443 ent, ok := c.sessions.Peek(token)
447 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
448 size += uint64(fs.MemorySize())
454 func (c *cache) lookupCollection(key string) *arvados.Collection {
455 e, cached := c.collections.Get(key)
459 ent := e.(*cachedCollection)
460 if ent.expire.Before(time.Now()) {
461 c.collections.Remove(key)
464 c.metrics.collectionHits.Inc()
465 return ent.collection