16745: Prune enough sessions to reach size limit, not all.
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 const metricsUpdateInterval = time.Second / 10
20
21 type cache struct {
22         cluster     *arvados.Cluster
23         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24         registry    *prometheus.Registry
25         metrics     cacheMetrics
26         pdhs        *lru.TwoQueueCache
27         collections *lru.TwoQueueCache
28         permissions *lru.TwoQueueCache
29         sessions    *lru.TwoQueueCache
30         setupOnce   sync.Once
31 }
32
33 type cacheMetrics struct {
34         requests          prometheus.Counter
35         collectionBytes   prometheus.Gauge
36         collectionEntries prometheus.Gauge
37         sessionEntries    prometheus.Gauge
38         collectionHits    prometheus.Counter
39         pdhHits           prometheus.Counter
40         permissionHits    prometheus.Counter
41         sessionHits       prometheus.Counter
42         sessionMisses     prometheus.Counter
43         apiCalls          prometheus.Counter
44 }
45
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
48                 Namespace: "arvados",
49                 Subsystem: "keepweb_collectioncache",
50                 Name:      "requests",
51                 Help:      "Number of targetID-to-manifest lookups handled.",
52         })
53         reg.MustRegister(m.requests)
54         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
55                 Namespace: "arvados",
56                 Subsystem: "keepweb_collectioncache",
57                 Name:      "hits",
58                 Help:      "Number of pdh-to-manifest cache hits.",
59         })
60         reg.MustRegister(m.collectionHits)
61         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
62                 Namespace: "arvados",
63                 Subsystem: "keepweb_collectioncache",
64                 Name:      "pdh_hits",
65                 Help:      "Number of uuid-to-pdh cache hits.",
66         })
67         reg.MustRegister(m.pdhHits)
68         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
69                 Namespace: "arvados",
70                 Subsystem: "keepweb_collectioncache",
71                 Name:      "permission_hits",
72                 Help:      "Number of targetID-to-permission cache hits.",
73         })
74         reg.MustRegister(m.permissionHits)
75         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
76                 Namespace: "arvados",
77                 Subsystem: "keepweb_collectioncache",
78                 Name:      "api_calls",
79                 Help:      "Number of outgoing API calls made by cache.",
80         })
81         reg.MustRegister(m.apiCalls)
82         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
83                 Namespace: "arvados",
84                 Subsystem: "keepweb_collectioncache",
85                 Name:      "cached_manifest_bytes",
86                 Help:      "Total size of all cached manifests and sessions.",
87         })
88         reg.MustRegister(m.collectionBytes)
89         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
90                 Namespace: "arvados",
91                 Subsystem: "keepweb_collectioncache",
92                 Name:      "cached_manifests",
93                 Help:      "Number of manifests in cache.",
94         })
95         reg.MustRegister(m.collectionEntries)
96         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
97                 Namespace: "arvados",
98                 Subsystem: "keepweb_sessions",
99                 Name:      "active",
100                 Help:      "Number of active token sessions.",
101         })
102         reg.MustRegister(m.sessionEntries)
103         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104                 Namespace: "arvados",
105                 Subsystem: "keepweb_sessions",
106                 Name:      "hits",
107                 Help:      "Number of token session cache hits.",
108         })
109         reg.MustRegister(m.sessionHits)
110         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111                 Namespace: "arvados",
112                 Subsystem: "keepweb_sessions",
113                 Name:      "misses",
114                 Help:      "Number of token session cache misses.",
115         })
116         reg.MustRegister(m.sessionMisses)
117 }
118
119 type cachedPDH struct {
120         expire time.Time
121         pdh    string
122 }
123
124 type cachedCollection struct {
125         expire     time.Time
126         collection *arvados.Collection
127 }
128
129 type cachedPermission struct {
130         expire time.Time
131 }
132
133 type cachedSession struct {
134         expire time.Time
135         fs     atomic.Value
136 }
137
138 func (c *cache) setup() {
139         var err error
140         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
141         if err != nil {
142                 panic(err)
143         }
144         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
145         if err != nil {
146                 panic(err)
147         }
148         c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
149         if err != nil {
150                 panic(err)
151         }
152         c.sessions, err = lru.New2Q(c.config.MaxSessions)
153         if err != nil {
154                 panic(err)
155         }
156
157         reg := c.registry
158         if reg == nil {
159                 reg = prometheus.NewRegistry()
160         }
161         c.metrics.setup(reg)
162         go func() {
163                 for range time.Tick(metricsUpdateInterval) {
164                         c.updateGauges()
165                 }
166         }()
167 }
168
169 func (c *cache) updateGauges() {
170         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
171         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
172         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
173 }
174
175 var selectPDH = map[string]interface{}{
176         "select": []string{"portable_data_hash"},
177 }
178
179 // Update saves a modified version (fs) to an existing collection
180 // (coll) and, if successful, updates the relevant cache entries so
181 // subsequent calls to Get() reflect the modifications.
182 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
183         c.setupOnce.Do(c.setup)
184
185         m, err := fs.MarshalManifest(".")
186         if err != nil || m == coll.ManifestText {
187                 return err
188         }
189         coll.ManifestText = m
190         var updated arvados.Collection
191         defer c.pdhs.Remove(coll.UUID)
192         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
193                 "collection": map[string]string{
194                         "manifest_text": coll.ManifestText,
195                 },
196         })
197         if err == nil {
198                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
199                         expire:     time.Now().Add(time.Duration(c.config.TTL)),
200                         collection: &updated,
201                 })
202         }
203         return err
204 }
205
206 // ResetSession unloads any potentially stale state. Should be called
207 // after write operations, so subsequent reads don't return stale
208 // data.
209 func (c *cache) ResetSession(token string) {
210         c.setupOnce.Do(c.setup)
211         c.sessions.Remove(token)
212 }
213
214 // Get a long-lived CustomFileSystem suitable for doing a read operation
215 // with the given token.
216 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
217         c.setupOnce.Do(c.setup)
218         now := time.Now()
219         ent, _ := c.sessions.Get(token)
220         sess, _ := ent.(*cachedSession)
221         if sess == nil {
222                 c.metrics.sessionMisses.Inc()
223                 sess = &cachedSession{
224                         expire: now.Add(c.config.TTL.Duration()),
225                 }
226                 c.sessions.Add(token, sess)
227         } else if sess.expire.Before(now) {
228                 c.metrics.sessionMisses.Inc()
229                 sess.fs.Store(nil)
230         } else {
231                 c.metrics.sessionHits.Inc()
232         }
233         go c.pruneSessions()
234         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
235         if fs != nil {
236                 return fs, nil
237         }
238         ac, err := arvados.NewClientFromConfig(c.cluster)
239         if err != nil {
240                 return nil, err
241         }
242         ac.AuthToken = token
243         arv, err := arvadosclient.New(ac)
244         if err != nil {
245                 return nil, err
246         }
247         kc := keepclient.New(arv)
248         fs = ac.SiteFileSystem(kc)
249         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
250         sess.fs.Store(fs)
251         return fs, nil
252 }
253
254 // Remove all expired session cache entries, then remove more entries
255 // until approximate remaining size <= maxsize/2
256 func (c *cache) pruneSessions() {
257         now := time.Now()
258         var size int64
259         keys := c.sessions.Keys()
260         for _, token := range keys {
261                 ent, ok := c.sessions.Peek(token)
262                 if !ok {
263                         continue
264                 }
265                 s := ent.(*cachedSession)
266                 if s.expire.Before(now) {
267                         c.sessions.Remove(token)
268                         continue
269                 }
270                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
271                         size += fs.MemorySize()
272                 }
273         }
274         // Remove tokens until reaching size limit, starting with the
275         // least frequently used entries (which Keys() returns last).
276         for i := len(keys) - 1; i >= 0; i-- {
277                 token := keys[i]
278                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
279                         break
280                 }
281                 ent, ok := c.sessions.Peek(token)
282                 if !ok {
283                         continue
284                 }
285                 s := ent.(*cachedSession)
286                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
287                 if fs == nil {
288                         continue
289                 }
290                 c.sessions.Remove(token)
291                 size -= fs.MemorySize()
292         }
293 }
294
295 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
296         c.setupOnce.Do(c.setup)
297         c.metrics.requests.Inc()
298
299         permOK := false
300         permKey := arv.ApiToken + "\000" + targetID
301         if forceReload {
302         } else if ent, cached := c.permissions.Get(permKey); cached {
303                 ent := ent.(*cachedPermission)
304                 if ent.expire.Before(time.Now()) {
305                         c.permissions.Remove(permKey)
306                 } else {
307                         permOK = true
308                         c.metrics.permissionHits.Inc()
309                 }
310         }
311
312         var pdh string
313         if arvadosclient.PDHMatch(targetID) {
314                 pdh = targetID
315         } else if ent, cached := c.pdhs.Get(targetID); cached {
316                 ent := ent.(*cachedPDH)
317                 if ent.expire.Before(time.Now()) {
318                         c.pdhs.Remove(targetID)
319                 } else {
320                         pdh = ent.pdh
321                         c.metrics.pdhHits.Inc()
322                 }
323         }
324
325         var collection *arvados.Collection
326         if pdh != "" {
327                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
328         }
329
330         if collection != nil && permOK {
331                 return collection, nil
332         } else if collection != nil {
333                 // Ask API for current PDH for this targetID. Most
334                 // likely, the cached PDH is still correct; if so,
335                 // _and_ the current token has permission, we can
336                 // use our cached manifest.
337                 c.metrics.apiCalls.Inc()
338                 var current arvados.Collection
339                 err := arv.Get("collections", targetID, selectPDH, &current)
340                 if err != nil {
341                         return nil, err
342                 }
343                 if current.PortableDataHash == pdh {
344                         c.permissions.Add(permKey, &cachedPermission{
345                                 expire: time.Now().Add(time.Duration(c.config.TTL)),
346                         })
347                         if pdh != targetID {
348                                 c.pdhs.Add(targetID, &cachedPDH{
349                                         expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
350                                         pdh:    pdh,
351                                 })
352                         }
353                         return collection, err
354                 }
355                 // PDH changed, but now we know we have
356                 // permission -- and maybe we already have the
357                 // new PDH in the cache.
358                 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
359                         return coll, nil
360                 }
361         }
362
363         // Collection manifest is not cached.
364         c.metrics.apiCalls.Inc()
365         err := arv.Get("collections", targetID, nil, &collection)
366         if err != nil {
367                 return nil, err
368         }
369         exp := time.Now().Add(time.Duration(c.config.TTL))
370         c.permissions.Add(permKey, &cachedPermission{
371                 expire: exp,
372         })
373         c.pdhs.Add(targetID, &cachedPDH{
374                 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
375                 pdh:    collection.PortableDataHash,
376         })
377         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
378                 expire:     exp,
379                 collection: collection,
380         })
381         if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
382                 go c.pruneCollections()
383         }
384         return collection, nil
385 }
386
387 // pruneCollections checks the total bytes occupied by manifest_text
388 // in the collection cache and removes old entries as needed to bring
389 // the total size down to CollectionBytes. It also deletes all expired
390 // entries.
391 //
392 // pruneCollections does not aim to be perfectly correct when there is
393 // concurrent cache activity.
394 func (c *cache) pruneCollections() {
395         var size int64
396         now := time.Now()
397         keys := c.collections.Keys()
398         entsize := make([]int, len(keys))
399         expired := make([]bool, len(keys))
400         for i, k := range keys {
401                 v, ok := c.collections.Peek(k)
402                 if !ok {
403                         continue
404                 }
405                 ent := v.(*cachedCollection)
406                 n := len(ent.collection.ManifestText)
407                 size += int64(n)
408                 entsize[i] = n
409                 expired[i] = ent.expire.Before(now)
410         }
411         for i, k := range keys {
412                 if expired[i] {
413                         c.collections.Remove(k)
414                         size -= int64(entsize[i])
415                 }
416         }
417         for i, k := range keys {
418                 if size <= c.config.MaxCollectionBytes/2 {
419                         break
420                 }
421                 if expired[i] {
422                         // already removed this entry in the previous loop
423                         continue
424                 }
425                 c.collections.Remove(k)
426                 size -= int64(entsize[i])
427         }
428 }
429
430 // collectionBytes returns the approximate combined memory size of the
431 // collection cache and session filesystem cache.
432 func (c *cache) collectionBytes() uint64 {
433         var size uint64
434         for _, k := range c.collections.Keys() {
435                 v, ok := c.collections.Peek(k)
436                 if !ok {
437                         continue
438                 }
439                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
440         }
441         for _, token := range c.sessions.Keys() {
442                 ent, ok := c.sessions.Peek(token)
443                 if !ok {
444                         continue
445                 }
446                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
447                         size += uint64(fs.MemorySize())
448                 }
449         }
450         return size
451 }
452
453 func (c *cache) lookupCollection(key string) *arvados.Collection {
454         e, cached := c.collections.Get(key)
455         if !cached {
456                 return nil
457         }
458         ent := e.(*cachedCollection)
459         if ent.expire.Before(time.Now()) {
460                 c.collections.Remove(key)
461                 return nil
462         }
463         c.metrics.collectionHits.Inc()
464         return ent.collection
465 }