Merge branch '17022-user-activity-report' refs #17022
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "time"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
13         "github.com/hashicorp/golang-lru"
14         "github.com/prometheus/client_golang/prometheus"
15 )
16
17 const metricsUpdateInterval = time.Second / 10
18
19 type cache struct {
20         config      *arvados.WebDAVCacheConfig
21         registry    *prometheus.Registry
22         metrics     cacheMetrics
23         pdhs        *lru.TwoQueueCache
24         collections *lru.TwoQueueCache
25         permissions *lru.TwoQueueCache
26         setupOnce   sync.Once
27 }
28
29 type cacheMetrics struct {
30         requests          prometheus.Counter
31         collectionBytes   prometheus.Gauge
32         collectionEntries prometheus.Gauge
33         collectionHits    prometheus.Counter
34         pdhHits           prometheus.Counter
35         permissionHits    prometheus.Counter
36         apiCalls          prometheus.Counter
37 }
38
39 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
40         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
41                 Namespace: "arvados",
42                 Subsystem: "keepweb_collectioncache",
43                 Name:      "requests",
44                 Help:      "Number of targetID-to-manifest lookups handled.",
45         })
46         reg.MustRegister(m.requests)
47         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
48                 Namespace: "arvados",
49                 Subsystem: "keepweb_collectioncache",
50                 Name:      "hits",
51                 Help:      "Number of pdh-to-manifest cache hits.",
52         })
53         reg.MustRegister(m.collectionHits)
54         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
55                 Namespace: "arvados",
56                 Subsystem: "keepweb_collectioncache",
57                 Name:      "pdh_hits",
58                 Help:      "Number of uuid-to-pdh cache hits.",
59         })
60         reg.MustRegister(m.pdhHits)
61         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
62                 Namespace: "arvados",
63                 Subsystem: "keepweb_collectioncache",
64                 Name:      "permission_hits",
65                 Help:      "Number of targetID-to-permission cache hits.",
66         })
67         reg.MustRegister(m.permissionHits)
68         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
69                 Namespace: "arvados",
70                 Subsystem: "keepweb_collectioncache",
71                 Name:      "api_calls",
72                 Help:      "Number of outgoing API calls made by cache.",
73         })
74         reg.MustRegister(m.apiCalls)
75         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
76                 Namespace: "arvados",
77                 Subsystem: "keepweb_collectioncache",
78                 Name:      "cached_manifest_bytes",
79                 Help:      "Total size of all manifests in cache.",
80         })
81         reg.MustRegister(m.collectionBytes)
82         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
83                 Namespace: "arvados",
84                 Subsystem: "keepweb_collectioncache",
85                 Name:      "cached_manifests",
86                 Help:      "Number of manifests in cache.",
87         })
88         reg.MustRegister(m.collectionEntries)
89 }
90
91 type cachedPDH struct {
92         expire time.Time
93         pdh    string
94 }
95
96 type cachedCollection struct {
97         expire     time.Time
98         collection *arvados.Collection
99 }
100
101 type cachedPermission struct {
102         expire time.Time
103 }
104
105 func (c *cache) setup() {
106         var err error
107         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
108         if err != nil {
109                 panic(err)
110         }
111         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
112         if err != nil {
113                 panic(err)
114         }
115         c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
116         if err != nil {
117                 panic(err)
118         }
119
120         reg := c.registry
121         if reg == nil {
122                 reg = prometheus.NewRegistry()
123         }
124         c.metrics.setup(reg)
125         go func() {
126                 for range time.Tick(metricsUpdateInterval) {
127                         c.updateGauges()
128                 }
129         }()
130 }
131
132 func (c *cache) updateGauges() {
133         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
134         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
135 }
136
137 var selectPDH = map[string]interface{}{
138         "select": []string{"portable_data_hash"},
139 }
140
141 // Update saves a modified version (fs) to an existing collection
142 // (coll) and, if successful, updates the relevant cache entries so
143 // subsequent calls to Get() reflect the modifications.
144 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
145         c.setupOnce.Do(c.setup)
146
147         m, err := fs.MarshalManifest(".")
148         if err != nil || m == coll.ManifestText {
149                 return err
150         }
151         coll.ManifestText = m
152         var updated arvados.Collection
153         defer c.pdhs.Remove(coll.UUID)
154         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
155                 "collection": map[string]string{
156                         "manifest_text": coll.ManifestText,
157                 },
158         })
159         if err == nil {
160                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
161                         expire:     time.Now().Add(time.Duration(c.config.TTL)),
162                         collection: &updated,
163                 })
164         }
165         return err
166 }
167
168 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
169         c.setupOnce.Do(c.setup)
170         c.metrics.requests.Inc()
171
172         permOK := false
173         permKey := arv.ApiToken + "\000" + targetID
174         if forceReload {
175         } else if ent, cached := c.permissions.Get(permKey); cached {
176                 ent := ent.(*cachedPermission)
177                 if ent.expire.Before(time.Now()) {
178                         c.permissions.Remove(permKey)
179                 } else {
180                         permOK = true
181                         c.metrics.permissionHits.Inc()
182                 }
183         }
184
185         var pdh string
186         if arvadosclient.PDHMatch(targetID) {
187                 pdh = targetID
188         } else if ent, cached := c.pdhs.Get(targetID); cached {
189                 ent := ent.(*cachedPDH)
190                 if ent.expire.Before(time.Now()) {
191                         c.pdhs.Remove(targetID)
192                 } else {
193                         pdh = ent.pdh
194                         c.metrics.pdhHits.Inc()
195                 }
196         }
197
198         var collection *arvados.Collection
199         if pdh != "" {
200                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
201         }
202
203         if collection != nil && permOK {
204                 return collection, nil
205         } else if collection != nil {
206                 // Ask API for current PDH for this targetID. Most
207                 // likely, the cached PDH is still correct; if so,
208                 // _and_ the current token has permission, we can
209                 // use our cached manifest.
210                 c.metrics.apiCalls.Inc()
211                 var current arvados.Collection
212                 err := arv.Get("collections", targetID, selectPDH, &current)
213                 if err != nil {
214                         return nil, err
215                 }
216                 if current.PortableDataHash == pdh {
217                         c.permissions.Add(permKey, &cachedPermission{
218                                 expire: time.Now().Add(time.Duration(c.config.TTL)),
219                         })
220                         if pdh != targetID {
221                                 c.pdhs.Add(targetID, &cachedPDH{
222                                         expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
223                                         pdh:    pdh,
224                                 })
225                         }
226                         return collection, err
227                 }
228                 // PDH changed, but now we know we have
229                 // permission -- and maybe we already have the
230                 // new PDH in the cache.
231                 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
232                         return coll, nil
233                 }
234         }
235
236         // Collection manifest is not cached.
237         c.metrics.apiCalls.Inc()
238         err := arv.Get("collections", targetID, nil, &collection)
239         if err != nil {
240                 return nil, err
241         }
242         exp := time.Now().Add(time.Duration(c.config.TTL))
243         c.permissions.Add(permKey, &cachedPermission{
244                 expire: exp,
245         })
246         c.pdhs.Add(targetID, &cachedPDH{
247                 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
248                 pdh:    collection.PortableDataHash,
249         })
250         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
251                 expire:     exp,
252                 collection: collection,
253         })
254         if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
255                 go c.pruneCollections()
256         }
257         return collection, nil
258 }
259
260 // pruneCollections checks the total bytes occupied by manifest_text
261 // in the collection cache and removes old entries as needed to bring
262 // the total size down to CollectionBytes. It also deletes all expired
263 // entries.
264 //
265 // pruneCollections does not aim to be perfectly correct when there is
266 // concurrent cache activity.
267 func (c *cache) pruneCollections() {
268         var size int64
269         now := time.Now()
270         keys := c.collections.Keys()
271         entsize := make([]int, len(keys))
272         expired := make([]bool, len(keys))
273         for i, k := range keys {
274                 v, ok := c.collections.Peek(k)
275                 if !ok {
276                         continue
277                 }
278                 ent := v.(*cachedCollection)
279                 n := len(ent.collection.ManifestText)
280                 size += int64(n)
281                 entsize[i] = n
282                 expired[i] = ent.expire.Before(now)
283         }
284         for i, k := range keys {
285                 if expired[i] {
286                         c.collections.Remove(k)
287                         size -= int64(entsize[i])
288                 }
289         }
290         for i, k := range keys {
291                 if size <= c.config.MaxCollectionBytes {
292                         break
293                 }
294                 if expired[i] {
295                         // already removed this entry in the previous loop
296                         continue
297                 }
298                 c.collections.Remove(k)
299                 size -= int64(entsize[i])
300         }
301 }
302
303 // collectionBytes returns the approximate memory size of the
304 // collection cache.
305 func (c *cache) collectionBytes() uint64 {
306         var size uint64
307         for _, k := range c.collections.Keys() {
308                 v, ok := c.collections.Peek(k)
309                 if !ok {
310                         continue
311                 }
312                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
313         }
314         return size
315 }
316
317 func (c *cache) lookupCollection(key string) *arvados.Collection {
318         e, cached := c.collections.Get(key)
319         if !cached {
320                 return nil
321         }
322         ent := e.(*cachedCollection)
323         if ent.expire.Before(time.Now()) {
324                 c.collections.Remove(key)
325                 return nil
326         }
327         c.metrics.collectionHits.Inc()
328         return ent.collection
329 }