1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14 "git.arvados.org/arvados.git/sdk/go/keepclient"
15 lru "github.com/hashicorp/golang-lru"
16 "github.com/prometheus/client_golang/prometheus"
19 const metricsUpdateInterval = time.Second / 10
22 cluster *arvados.Cluster
23 config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24 registry *prometheus.Registry
26 pdhs *lru.TwoQueueCache
27 collections *lru.TwoQueueCache
28 permissions *lru.TwoQueueCache
29 sessions *lru.TwoQueueCache
33 type cacheMetrics struct {
34 requests prometheus.Counter
35 collectionBytes prometheus.Gauge
36 collectionEntries prometheus.Gauge
37 sessionEntries prometheus.Gauge
38 collectionHits prometheus.Counter
39 pdhHits prometheus.Counter
40 permissionHits prometheus.Counter
41 sessionHits prometheus.Counter
42 sessionMisses prometheus.Counter
43 apiCalls prometheus.Counter
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
49 Subsystem: "keepweb_collectioncache",
51 Help: "Number of targetID-to-manifest lookups handled.",
53 reg.MustRegister(m.requests)
54 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
56 Subsystem: "keepweb_collectioncache",
58 Help: "Number of pdh-to-manifest cache hits.",
60 reg.MustRegister(m.collectionHits)
61 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
63 Subsystem: "keepweb_collectioncache",
65 Help: "Number of uuid-to-pdh cache hits.",
67 reg.MustRegister(m.pdhHits)
68 m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
70 Subsystem: "keepweb_collectioncache",
71 Name: "permission_hits",
72 Help: "Number of targetID-to-permission cache hits.",
74 reg.MustRegister(m.permissionHits)
75 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
77 Subsystem: "keepweb_collectioncache",
79 Help: "Number of outgoing API calls made by cache.",
81 reg.MustRegister(m.apiCalls)
82 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
84 Subsystem: "keepweb_collectioncache",
85 Name: "cached_manifest_bytes",
86 Help: "Total size of all manifests in cache.",
88 reg.MustRegister(m.collectionBytes)
89 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
91 Subsystem: "keepweb_collectioncache",
92 Name: "cached_manifests",
93 Help: "Number of manifests in cache.",
95 reg.MustRegister(m.collectionEntries)
96 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
98 Subsystem: "keepweb_sessions",
100 Help: "Number of active token sessions.",
102 reg.MustRegister(m.sessionEntries)
103 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104 Namespace: "arvados",
105 Subsystem: "keepweb_sessions",
107 Help: "Number of token session cache hits.",
109 reg.MustRegister(m.sessionHits)
110 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111 Namespace: "arvados",
112 Subsystem: "keepweb_sessions",
114 Help: "Number of token session cache misses.",
116 reg.MustRegister(m.sessionMisses)
119 type cachedPDH struct {
124 type cachedCollection struct {
126 collection *arvados.Collection
129 type cachedPermission struct {
133 type cachedSession struct {
138 func (c *cache) setup() {
140 c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
144 c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
148 c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
152 c.sessions, err = lru.New2Q(c.config.MaxSessions)
159 reg = prometheus.NewRegistry()
163 for range time.Tick(metricsUpdateInterval) {
169 func (c *cache) updateGauges() {
170 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
171 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
172 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
175 var selectPDH = map[string]interface{}{
176 "select": []string{"portable_data_hash"},
179 // Update saves a modified version (fs) to an existing collection
180 // (coll) and, if successful, updates the relevant cache entries so
181 // subsequent calls to Get() reflect the modifications.
182 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
183 c.setupOnce.Do(c.setup)
185 m, err := fs.MarshalManifest(".")
186 if err != nil || m == coll.ManifestText {
189 coll.ManifestText = m
190 var updated arvados.Collection
191 defer c.pdhs.Remove(coll.UUID)
192 err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
193 "collection": map[string]string{
194 "manifest_text": coll.ManifestText,
198 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
199 expire: time.Now().Add(time.Duration(c.config.TTL)),
200 collection: &updated,
206 // ResetSession unloads any potentially stale state. Should be called
207 // after write operations, so subsequent reads don't return stale
209 func (c *cache) ResetSession(token string) {
210 c.setupOnce.Do(c.setup)
211 c.sessions.Remove(token)
214 // Get a long-lived CustomFileSystem suitable for doing a read operation
215 // with the given token.
216 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
217 c.setupOnce.Do(c.setup)
219 ent, _ := c.sessions.Get(token)
220 sess, _ := ent.(*cachedSession)
222 c.metrics.sessionMisses.Inc()
223 sess = &cachedSession{
224 expire: now.Add(c.config.TTL.Duration()),
226 c.sessions.Add(token, sess)
227 } else if sess.expire.Before(now) {
228 c.metrics.sessionMisses.Inc()
231 c.metrics.sessionHits.Inc()
234 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
238 ac, err := arvados.NewClientFromConfig(c.cluster)
243 arv, err := arvadosclient.New(ac)
247 kc := keepclient.New(arv)
248 fs = ac.SiteFileSystem(kc)
249 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
254 func (c *cache) pruneSessions() {
257 for _, token := range c.sessions.Keys() {
258 ent, ok := c.sessions.Peek(token)
262 s := ent.(*cachedSession)
263 if s.expire.Before(now) {
264 c.sessions.Remove(token)
267 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
271 size += fs.MemorySize()
273 if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
274 for _, token := range c.sessions.Keys() {
275 c.sessions.Remove(token)
280 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
281 c.setupOnce.Do(c.setup)
282 c.metrics.requests.Inc()
285 permKey := arv.ApiToken + "\000" + targetID
287 } else if ent, cached := c.permissions.Get(permKey); cached {
288 ent := ent.(*cachedPermission)
289 if ent.expire.Before(time.Now()) {
290 c.permissions.Remove(permKey)
293 c.metrics.permissionHits.Inc()
298 if arvadosclient.PDHMatch(targetID) {
300 } else if ent, cached := c.pdhs.Get(targetID); cached {
301 ent := ent.(*cachedPDH)
302 if ent.expire.Before(time.Now()) {
303 c.pdhs.Remove(targetID)
306 c.metrics.pdhHits.Inc()
310 var collection *arvados.Collection
312 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
315 if collection != nil && permOK {
316 return collection, nil
317 } else if collection != nil {
318 // Ask API for current PDH for this targetID. Most
319 // likely, the cached PDH is still correct; if so,
320 // _and_ the current token has permission, we can
321 // use our cached manifest.
322 c.metrics.apiCalls.Inc()
323 var current arvados.Collection
324 err := arv.Get("collections", targetID, selectPDH, ¤t)
328 if current.PortableDataHash == pdh {
329 c.permissions.Add(permKey, &cachedPermission{
330 expire: time.Now().Add(time.Duration(c.config.TTL)),
333 c.pdhs.Add(targetID, &cachedPDH{
334 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
338 return collection, err
340 // PDH changed, but now we know we have
341 // permission -- and maybe we already have the
342 // new PDH in the cache.
343 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
348 // Collection manifest is not cached.
349 c.metrics.apiCalls.Inc()
350 err := arv.Get("collections", targetID, nil, &collection)
354 exp := time.Now().Add(time.Duration(c.config.TTL))
355 c.permissions.Add(permKey, &cachedPermission{
358 c.pdhs.Add(targetID, &cachedPDH{
359 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
360 pdh: collection.PortableDataHash,
362 c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
364 collection: collection,
366 if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
367 go c.pruneCollections()
369 return collection, nil
372 // pruneCollections checks the total bytes occupied by manifest_text
373 // in the collection cache and removes old entries as needed to bring
374 // the total size down to CollectionBytes. It also deletes all expired
377 // pruneCollections does not aim to be perfectly correct when there is
378 // concurrent cache activity.
379 func (c *cache) pruneCollections() {
382 keys := c.collections.Keys()
383 entsize := make([]int, len(keys))
384 expired := make([]bool, len(keys))
385 for i, k := range keys {
386 v, ok := c.collections.Peek(k)
390 ent := v.(*cachedCollection)
391 n := len(ent.collection.ManifestText)
394 expired[i] = ent.expire.Before(now)
396 for i, k := range keys {
398 c.collections.Remove(k)
399 size -= int64(entsize[i])
402 for i, k := range keys {
403 if size <= c.config.MaxCollectionBytes/2 {
407 // already removed this entry in the previous loop
410 c.collections.Remove(k)
411 size -= int64(entsize[i])
415 // collectionBytes returns the approximate combined memory size of the
416 // collection cache and session filesystem cache.
417 func (c *cache) collectionBytes() uint64 {
419 for _, k := range c.collections.Keys() {
420 v, ok := c.collections.Peek(k)
424 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
426 for _, token := range c.sessions.Keys() {
427 ent, ok := c.sessions.Peek(token)
431 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
432 size += uint64(fs.MemorySize())
438 func (c *cache) lookupCollection(key string) *arvados.Collection {
439 e, cached := c.collections.Get(key)
443 ent := e.(*cachedCollection)
444 if ent.expire.Before(time.Now()) {
445 c.collections.Remove(key)
448 c.metrics.collectionHits.Inc()
449 return ent.collection