18051: Fix premature ejection from WebDAV collection cache.
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 const metricsUpdateInterval = time.Second / 10
20
21 type cache struct {
22         cluster     *arvados.Cluster
23         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24         registry    *prometheus.Registry
25         metrics     cacheMetrics
26         pdhs        *lru.TwoQueueCache
27         collections *lru.TwoQueueCache
28         sessions    *lru.TwoQueueCache
29         setupOnce   sync.Once
30
31         chPruneSessions    chan struct{}
32         chPruneCollections chan struct{}
33 }
34
35 type cacheMetrics struct {
36         requests          prometheus.Counter
37         collectionBytes   prometheus.Gauge
38         collectionEntries prometheus.Gauge
39         sessionEntries    prometheus.Gauge
40         collectionHits    prometheus.Counter
41         pdhHits           prometheus.Counter
42         sessionHits       prometheus.Counter
43         sessionMisses     prometheus.Counter
44         apiCalls          prometheus.Counter
45 }
46
47 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
48         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
49                 Namespace: "arvados",
50                 Subsystem: "keepweb_collectioncache",
51                 Name:      "requests",
52                 Help:      "Number of targetID-to-manifest lookups handled.",
53         })
54         reg.MustRegister(m.requests)
55         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
56                 Namespace: "arvados",
57                 Subsystem: "keepweb_collectioncache",
58                 Name:      "hits",
59                 Help:      "Number of pdh-to-manifest cache hits.",
60         })
61         reg.MustRegister(m.collectionHits)
62         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
63                 Namespace: "arvados",
64                 Subsystem: "keepweb_collectioncache",
65                 Name:      "pdh_hits",
66                 Help:      "Number of uuid-to-pdh cache hits.",
67         })
68         reg.MustRegister(m.pdhHits)
69         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
70                 Namespace: "arvados",
71                 Subsystem: "keepweb_collectioncache",
72                 Name:      "api_calls",
73                 Help:      "Number of outgoing API calls made by cache.",
74         })
75         reg.MustRegister(m.apiCalls)
76         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
77                 Namespace: "arvados",
78                 Subsystem: "keepweb_sessions",
79                 Name:      "cached_collection_bytes",
80                 Help:      "Total size of all cached manifests and sessions.",
81         })
82         reg.MustRegister(m.collectionBytes)
83         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
84                 Namespace: "arvados",
85                 Subsystem: "keepweb_collectioncache",
86                 Name:      "cached_manifests",
87                 Help:      "Number of manifests in cache.",
88         })
89         reg.MustRegister(m.collectionEntries)
90         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
91                 Namespace: "arvados",
92                 Subsystem: "keepweb_sessions",
93                 Name:      "active",
94                 Help:      "Number of active token sessions.",
95         })
96         reg.MustRegister(m.sessionEntries)
97         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
98                 Namespace: "arvados",
99                 Subsystem: "keepweb_sessions",
100                 Name:      "hits",
101                 Help:      "Number of token session cache hits.",
102         })
103         reg.MustRegister(m.sessionHits)
104         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
105                 Namespace: "arvados",
106                 Subsystem: "keepweb_sessions",
107                 Name:      "misses",
108                 Help:      "Number of token session cache misses.",
109         })
110         reg.MustRegister(m.sessionMisses)
111 }
112
113 type cachedPDH struct {
114         expire  time.Time
115         refresh time.Time
116         pdh     string
117 }
118
119 type cachedCollection struct {
120         expire     time.Time
121         collection *arvados.Collection
122 }
123
124 type cachedPermission struct {
125         expire time.Time
126 }
127
128 type cachedSession struct {
129         expire        time.Time
130         fs            atomic.Value
131         client        *arvados.Client
132         arvadosclient *arvadosclient.ArvadosClient
133         keepclient    *keepclient.KeepClient
134         user          atomic.Value
135 }
136
137 func (c *cache) setup() {
138         var err error
139         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
140         if err != nil {
141                 panic(err)
142         }
143         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
144         if err != nil {
145                 panic(err)
146         }
147         c.sessions, err = lru.New2Q(c.config.MaxSessions)
148         if err != nil {
149                 panic(err)
150         }
151
152         reg := c.registry
153         if reg == nil {
154                 reg = prometheus.NewRegistry()
155         }
156         c.metrics.setup(reg)
157         go func() {
158                 for range time.Tick(metricsUpdateInterval) {
159                         c.updateGauges()
160                 }
161         }()
162         c.chPruneCollections = make(chan struct{}, 1)
163         go func() {
164                 for range c.chPruneCollections {
165                         c.pruneCollections()
166                 }
167         }()
168         c.chPruneSessions = make(chan struct{}, 1)
169         go func() {
170                 for range c.chPruneSessions {
171                         c.pruneSessions()
172                 }
173         }()
174 }
175
176 func (c *cache) updateGauges() {
177         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
178         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
179         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
180 }
181
182 var selectPDH = map[string]interface{}{
183         "select": []string{"portable_data_hash"},
184 }
185
186 // Update saves a modified version (fs) to an existing collection
187 // (coll) and, if successful, updates the relevant cache entries so
188 // subsequent calls to Get() reflect the modifications.
189 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
190         c.setupOnce.Do(c.setup)
191
192         m, err := fs.MarshalManifest(".")
193         if err != nil || m == coll.ManifestText {
194                 return err
195         }
196         coll.ManifestText = m
197         var updated arvados.Collection
198         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
199                 "collection": map[string]string{
200                         "manifest_text": coll.ManifestText,
201                 },
202         })
203         if err != nil {
204                 c.pdhs.Remove(coll.UUID)
205                 return err
206         }
207         c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
208                 expire:     time.Now().Add(time.Duration(c.config.TTL)),
209                 collection: &updated,
210         })
211         c.pdhs.Add(coll.UUID, &cachedPDH{
212                 expire:  time.Now().Add(time.Duration(c.config.TTL)),
213                 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
214                 pdh:     updated.PortableDataHash,
215         })
216         return nil
217 }
218
219 // ResetSession unloads any potentially stale state. Should be called
220 // after write operations, so subsequent reads don't return stale
221 // data.
222 func (c *cache) ResetSession(token string) {
223         c.setupOnce.Do(c.setup)
224         c.sessions.Remove(token)
225 }
226
227 // Get a long-lived CustomFileSystem suitable for doing a read operation
228 // with the given token.
229 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
230         c.setupOnce.Do(c.setup)
231         now := time.Now()
232         ent, _ := c.sessions.Get(token)
233         sess, _ := ent.(*cachedSession)
234         expired := false
235         if sess == nil {
236                 c.metrics.sessionMisses.Inc()
237                 sess = &cachedSession{
238                         expire: now.Add(c.config.TTL.Duration()),
239                 }
240                 var err error
241                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
242                 if err != nil {
243                         return nil, nil, err
244                 }
245                 sess.client.AuthToken = token
246                 sess.arvadosclient, err = arvadosclient.New(sess.client)
247                 if err != nil {
248                         return nil, nil, err
249                 }
250                 sess.keepclient = keepclient.New(sess.arvadosclient)
251                 c.sessions.Add(token, sess)
252         } else if sess.expire.Before(now) {
253                 c.metrics.sessionMisses.Inc()
254                 expired = true
255         } else {
256                 c.metrics.sessionHits.Inc()
257         }
258         select {
259         case c.chPruneSessions <- struct{}{}:
260         default:
261         }
262         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
263         if fs != nil && !expired {
264                 return fs, sess, nil
265         }
266         fs = sess.client.SiteFileSystem(sess.keepclient)
267         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
268         sess.fs.Store(fs)
269         return fs, sess, nil
270 }
271
272 // Remove all expired session cache entries, then remove more entries
273 // until approximate remaining size <= maxsize/2
274 func (c *cache) pruneSessions() {
275         now := time.Now()
276         var size int64
277         keys := c.sessions.Keys()
278         for _, token := range keys {
279                 ent, ok := c.sessions.Peek(token)
280                 if !ok {
281                         continue
282                 }
283                 s := ent.(*cachedSession)
284                 if s.expire.Before(now) {
285                         c.sessions.Remove(token)
286                         continue
287                 }
288                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
289                         size += fs.MemorySize()
290                 }
291         }
292         // Remove tokens until reaching size limit, starting with the
293         // least frequently used entries (which Keys() returns last).
294         for i := len(keys) - 1; i >= 0; i-- {
295                 token := keys[i]
296                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
297                         break
298                 }
299                 ent, ok := c.sessions.Peek(token)
300                 if !ok {
301                         continue
302                 }
303                 s := ent.(*cachedSession)
304                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
305                 if fs == nil {
306                         continue
307                 }
308                 c.sessions.Remove(token)
309                 size -= fs.MemorySize()
310         }
311 }
312
313 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
314         c.setupOnce.Do(c.setup)
315         c.metrics.requests.Inc()
316
317         var pdhRefresh bool
318         var pdh string
319         if arvadosclient.PDHMatch(targetID) {
320                 pdh = targetID
321         } else if ent, cached := c.pdhs.Get(targetID); cached {
322                 ent := ent.(*cachedPDH)
323                 if ent.expire.Before(time.Now()) {
324                         c.pdhs.Remove(targetID)
325                 } else {
326                         pdh = ent.pdh
327                         pdhRefresh = forceReload || time.Now().After(ent.refresh)
328                         c.metrics.pdhHits.Inc()
329                 }
330         }
331
332         if pdh == "" {
333                 // UUID->PDH mapping is not cached, might as well get
334                 // the whole collection record and be done (below).
335         } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
336                 // PDH->manifest is not cached, might as well get the
337                 // whole collection record (below).
338         } else if !pdhRefresh {
339                 // We looked up UUID->PDH very recently, and we still
340                 // have the manifest for that PDH.
341                 return cached, nil
342         } else {
343                 // Get current PDH for this UUID (and confirm we still
344                 // have read permission).  Most likely, the cached PDH
345                 // is still correct, in which case we can use our
346                 // cached manifest.
347                 c.metrics.apiCalls.Inc()
348                 var current arvados.Collection
349                 err := arv.Get("collections", targetID, selectPDH, &current)
350                 if err != nil {
351                         return nil, err
352                 }
353                 if current.PortableDataHash == pdh {
354                         // PDH has not changed, cached manifest is
355                         // correct.
356                         return cached, err
357                 }
358                 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
359                         // PDH changed, and we already have the
360                         // manifest for that new PDH.
361                         return cached, nil
362                 }
363         }
364
365         // Either UUID->PDH is not cached, or PDH->manifest is not
366         // cached.
367         var retrieved arvados.Collection
368         c.metrics.apiCalls.Inc()
369         err := arv.Get("collections", targetID, nil, &retrieved)
370         if err != nil {
371                 return nil, err
372         }
373         exp := time.Now().Add(time.Duration(c.config.TTL))
374         if targetID != retrieved.PortableDataHash {
375                 c.pdhs.Add(targetID, &cachedPDH{
376                         expire:  exp,
377                         refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
378                         pdh:     retrieved.PortableDataHash,
379                 })
380         }
381         c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
382                 expire:     exp,
383                 collection: &retrieved,
384         })
385         if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
386                 select {
387                 case c.chPruneCollections <- struct{}{}:
388                 default:
389                 }
390         }
391         return &retrieved, nil
392 }
393
394 // pruneCollections checks the total bytes occupied by manifest_text
395 // in the collection cache and removes old entries as needed to bring
396 // the total size down to CollectionBytes. It also deletes all expired
397 // entries.
398 //
399 // pruneCollections does not aim to be perfectly correct when there is
400 // concurrent cache activity.
401 func (c *cache) pruneCollections() {
402         var size int64
403         now := time.Now()
404         keys := c.collections.Keys()
405         entsize := make([]int, len(keys))
406         expired := make([]bool, len(keys))
407         for i, k := range keys {
408                 v, ok := c.collections.Peek(k)
409                 if !ok {
410                         continue
411                 }
412                 ent := v.(*cachedCollection)
413                 n := len(ent.collection.ManifestText)
414                 size += int64(n)
415                 entsize[i] = n
416                 expired[i] = ent.expire.Before(now)
417         }
418         for i, k := range keys {
419                 if expired[i] {
420                         c.collections.Remove(k)
421                         size -= int64(entsize[i])
422                 }
423         }
424         for i, k := range keys {
425                 if size <= c.config.MaxCollectionBytes/2 {
426                         break
427                 }
428                 if expired[i] {
429                         // already removed this entry in the previous loop
430                         continue
431                 }
432                 c.collections.Remove(k)
433                 size -= int64(entsize[i])
434         }
435 }
436
437 // collectionBytes returns the approximate combined memory size of the
438 // collection cache and session filesystem cache.
439 func (c *cache) collectionBytes() uint64 {
440         var size uint64
441         for _, k := range c.collections.Keys() {
442                 v, ok := c.collections.Peek(k)
443                 if !ok {
444                         continue
445                 }
446                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
447         }
448         for _, token := range c.sessions.Keys() {
449                 ent, ok := c.sessions.Peek(token)
450                 if !ok {
451                         continue
452                 }
453                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
454                         size += uint64(fs.MemorySize())
455                 }
456         }
457         return size
458 }
459
460 func (c *cache) lookupCollection(key string) *arvados.Collection {
461         e, cached := c.collections.Get(key)
462         if !cached {
463                 return nil
464         }
465         ent := e.(*cachedCollection)
466         if ent.expire.Before(time.Now()) {
467                 c.collections.Remove(key)
468                 return nil
469         }
470         c.metrics.collectionHits.Inc()
471         return ent.collection
472 }
473
474 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
475         // Get and cache user record associated with this
476         // token.  We need to know their UUID for logging, and
477         // whether they are an admin or not for certain
478         // permission checks.
479
480         // Get/create session entry
481         _, sess, err := c.GetSession(token)
482         if err != nil {
483                 return nil, err
484         }
485
486         // See if the user is already set, and if so, return it
487         user, _ := sess.user.Load().(*arvados.User)
488         if user != nil {
489                 return user, nil
490         }
491
492         // Fetch the user record
493         c.metrics.apiCalls.Inc()
494         var current arvados.User
495
496         err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
497         if err != nil {
498                 return nil, err
499         }
500
501         // Stash the user record for next time
502         sess.user.Store(&current)
503         return &current, nil
504 }