1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
14 "github.com/hashicorp/golang-lru"
15 "github.com/prometheus/client_golang/prometheus"
18 const metricsUpdateInterval = time.Second / 10
22 UUIDTTL arvados.Duration
23 MaxCollectionEntries int
24 MaxCollectionBytes int64
25 MaxPermissionEntries int
28 registry *prometheus.Registry
31 pdhs *lru.TwoQueueCache
32 collections *lru.TwoQueueCache
33 permissions *lru.TwoQueueCache
37 // cacheStats is EOL - add new metrics to cacheMetrics instead
38 type cacheStats struct {
39 Requests uint64 `json:"Cache.Requests"`
40 CollectionBytes uint64 `json:"Cache.CollectionBytes"`
41 CollectionEntries int `json:"Cache.CollectionEntries"`
42 CollectionHits uint64 `json:"Cache.CollectionHits"`
43 PDHHits uint64 `json:"Cache.UUIDHits"`
44 PermissionHits uint64 `json:"Cache.PermissionHits"`
45 APICalls uint64 `json:"Cache.APICalls"`
48 type cacheMetrics struct {
49 requests prometheus.Counter
50 collectionBytes prometheus.Gauge
51 collectionEntries prometheus.Gauge
52 collectionHits prometheus.Counter
53 pdhHits prometheus.Counter
54 permissionHits prometheus.Counter
55 apiCalls prometheus.Counter
58 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
59 m.requests = prometheus.NewCounter(prometheus.CounterOpts{
61 Subsystem: "keepweb_collectioncache",
63 Help: "Number of targetID-to-manifest lookups handled.",
65 reg.MustRegister(m.requests)
66 m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
68 Subsystem: "keepweb_collectioncache",
70 Help: "Number of pdh-to-manifest cache hits.",
72 reg.MustRegister(m.collectionHits)
73 m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
75 Subsystem: "keepweb_collectioncache",
77 Help: "Number of uuid-to-pdh cache hits.",
79 reg.MustRegister(m.pdhHits)
80 m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
82 Subsystem: "keepweb_collectioncache",
83 Name: "permission_hits",
84 Help: "Number of targetID-to-permission cache hits.",
86 reg.MustRegister(m.permissionHits)
87 m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
89 Subsystem: "keepweb_collectioncache",
91 Help: "Number of outgoing API calls made by cache.",
93 reg.MustRegister(m.apiCalls)
94 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
96 Subsystem: "keepweb_collectioncache",
97 Name: "cached_manifest_bytes",
98 Help: "Total size of all manifests in cache.",
100 reg.MustRegister(m.collectionBytes)
101 m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
102 Namespace: "arvados",
103 Subsystem: "keepweb_collectioncache",
104 Name: "cached_manifests",
105 Help: "Number of manifests in cache.",
107 reg.MustRegister(m.collectionEntries)
110 type cachedPDH struct {
115 type cachedCollection struct {
117 collection *arvados.Collection
120 type cachedPermission struct {
124 func (c *cache) setup() {
126 c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
130 c.collections, err = lru.New2Q(c.MaxCollectionEntries)
134 c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
141 reg = prometheus.NewRegistry()
145 for range time.Tick(metricsUpdateInterval) {
151 func (c *cache) updateGauges() {
152 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
153 c.metrics.collectionEntries.Set(float64(c.collections.Len()))
156 var selectPDH = map[string]interface{}{
157 "select": []string{"portable_data_hash"},
160 func (c *cache) Stats() cacheStats {
161 c.setupOnce.Do(c.setup)
163 Requests: atomic.LoadUint64(&c.stats.Requests),
164 CollectionBytes: c.collectionBytes(),
165 CollectionEntries: c.collections.Len(),
166 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
167 PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
168 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
169 APICalls: atomic.LoadUint64(&c.stats.APICalls),
173 // Update saves a modified version (fs) to an existing collection
174 // (coll) and, if successful, updates the relevant cache entries so
175 // subsequent calls to Get() reflect the modifications.
176 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
177 c.setupOnce.Do(c.setup)
179 if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
182 coll.ManifestText = m
184 var updated arvados.Collection
185 defer c.pdhs.Remove(coll.UUID)
186 err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
188 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
189 expire: time.Now().Add(time.Duration(c.TTL)),
190 collection: &updated,
196 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
197 c.setupOnce.Do(c.setup)
199 atomic.AddUint64(&c.stats.Requests, 1)
200 c.metrics.requests.Inc()
203 permKey := arv.ApiToken + "\000" + targetID
205 } else if ent, cached := c.permissions.Get(permKey); cached {
206 ent := ent.(*cachedPermission)
207 if ent.expire.Before(time.Now()) {
208 c.permissions.Remove(permKey)
211 atomic.AddUint64(&c.stats.PermissionHits, 1)
212 c.metrics.permissionHits.Inc()
217 if arvadosclient.PDHMatch(targetID) {
219 } else if ent, cached := c.pdhs.Get(targetID); cached {
220 ent := ent.(*cachedPDH)
221 if ent.expire.Before(time.Now()) {
222 c.pdhs.Remove(targetID)
225 atomic.AddUint64(&c.stats.PDHHits, 1)
226 c.metrics.pdhHits.Inc()
230 var collection *arvados.Collection
232 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
235 if collection != nil && permOK {
236 return collection, nil
237 } else if collection != nil {
238 // Ask API for current PDH for this targetID. Most
239 // likely, the cached PDH is still correct; if so,
240 // _and_ the current token has permission, we can
241 // use our cached manifest.
242 atomic.AddUint64(&c.stats.APICalls, 1)
243 c.metrics.apiCalls.Inc()
244 var current arvados.Collection
245 err := arv.Get("collections", targetID, selectPDH, ¤t)
249 if current.PortableDataHash == pdh {
250 c.permissions.Add(permKey, &cachedPermission{
251 expire: time.Now().Add(time.Duration(c.TTL)),
254 c.pdhs.Add(targetID, &cachedPDH{
255 expire: time.Now().Add(time.Duration(c.UUIDTTL)),
259 return collection, err
261 // PDH changed, but now we know we have
262 // permission -- and maybe we already have the
263 // new PDH in the cache.
264 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
270 // Collection manifest is not cached.
271 atomic.AddUint64(&c.stats.APICalls, 1)
272 c.metrics.apiCalls.Inc()
273 err := arv.Get("collections", targetID, nil, &collection)
277 exp := time.Now().Add(time.Duration(c.TTL))
278 c.permissions.Add(permKey, &cachedPermission{
281 c.pdhs.Add(targetID, &cachedPDH{
282 expire: time.Now().Add(time.Duration(c.UUIDTTL)),
283 pdh: collection.PortableDataHash,
285 c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
287 collection: collection,
289 if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
290 go c.pruneCollections()
292 return collection, nil
295 // pruneCollections checks the total bytes occupied by manifest_text
296 // in the collection cache and removes old entries as needed to bring
297 // the total size down to CollectionBytes. It also deletes all expired
300 // pruneCollections does not aim to be perfectly correct when there is
301 // concurrent cache activity.
302 func (c *cache) pruneCollections() {
305 keys := c.collections.Keys()
306 entsize := make([]int, len(keys))
307 expired := make([]bool, len(keys))
308 for i, k := range keys {
309 v, ok := c.collections.Peek(k)
313 ent := v.(*cachedCollection)
314 n := len(ent.collection.ManifestText)
317 expired[i] = ent.expire.Before(now)
319 for i, k := range keys {
321 c.collections.Remove(k)
322 size -= int64(entsize[i])
325 for i, k := range keys {
326 if size <= c.MaxCollectionBytes {
330 // already removed this entry in the previous loop
333 c.collections.Remove(k)
334 size -= int64(entsize[i])
338 // collectionBytes returns the approximate memory size of the
340 func (c *cache) collectionBytes() uint64 {
342 for _, k := range c.collections.Keys() {
343 v, ok := c.collections.Peek(k)
347 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
352 func (c *cache) lookupCollection(key string) *arvados.Collection {
353 e, cached := c.collections.Get(key)
357 ent := e.(*cachedCollection)
358 if ent.expire.Before(time.Now()) {
359 c.collections.Remove(key)
362 atomic.AddUint64(&c.stats.CollectionHits, 1)
363 c.metrics.collectionHits.Inc()
364 return ent.collection