18941: FUSE set number of get threads based on cache size
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 const metricsUpdateInterval = time.Second / 10
21
22 type cache struct {
23         cluster     *arvados.Cluster
24         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
25         logger      logrus.FieldLogger
26         registry    *prometheus.Registry
27         metrics     cacheMetrics
28         pdhs        *lru.TwoQueueCache
29         collections *lru.TwoQueueCache
30         sessions    *lru.TwoQueueCache
31         setupOnce   sync.Once
32
33         chPruneSessions    chan struct{}
34         chPruneCollections chan struct{}
35 }
36
37 type cacheMetrics struct {
38         requests          prometheus.Counter
39         collectionBytes   prometheus.Gauge
40         collectionEntries prometheus.Gauge
41         sessionEntries    prometheus.Gauge
42         collectionHits    prometheus.Counter
43         pdhHits           prometheus.Counter
44         sessionHits       prometheus.Counter
45         sessionMisses     prometheus.Counter
46         apiCalls          prometheus.Counter
47 }
48
49 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
50         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
51                 Namespace: "arvados",
52                 Subsystem: "keepweb_collectioncache",
53                 Name:      "requests",
54                 Help:      "Number of targetID-to-manifest lookups handled.",
55         })
56         reg.MustRegister(m.requests)
57         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
58                 Namespace: "arvados",
59                 Subsystem: "keepweb_collectioncache",
60                 Name:      "hits",
61                 Help:      "Number of pdh-to-manifest cache hits.",
62         })
63         reg.MustRegister(m.collectionHits)
64         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
65                 Namespace: "arvados",
66                 Subsystem: "keepweb_collectioncache",
67                 Name:      "pdh_hits",
68                 Help:      "Number of uuid-to-pdh cache hits.",
69         })
70         reg.MustRegister(m.pdhHits)
71         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
72                 Namespace: "arvados",
73                 Subsystem: "keepweb_collectioncache",
74                 Name:      "api_calls",
75                 Help:      "Number of outgoing API calls made by cache.",
76         })
77         reg.MustRegister(m.apiCalls)
78         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
79                 Namespace: "arvados",
80                 Subsystem: "keepweb_sessions",
81                 Name:      "cached_collection_bytes",
82                 Help:      "Total size of all cached manifests and sessions.",
83         })
84         reg.MustRegister(m.collectionBytes)
85         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
86                 Namespace: "arvados",
87                 Subsystem: "keepweb_collectioncache",
88                 Name:      "cached_manifests",
89                 Help:      "Number of manifests in cache.",
90         })
91         reg.MustRegister(m.collectionEntries)
92         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
93                 Namespace: "arvados",
94                 Subsystem: "keepweb_sessions",
95                 Name:      "active",
96                 Help:      "Number of active token sessions.",
97         })
98         reg.MustRegister(m.sessionEntries)
99         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100                 Namespace: "arvados",
101                 Subsystem: "keepweb_sessions",
102                 Name:      "hits",
103                 Help:      "Number of token session cache hits.",
104         })
105         reg.MustRegister(m.sessionHits)
106         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
107                 Namespace: "arvados",
108                 Subsystem: "keepweb_sessions",
109                 Name:      "misses",
110                 Help:      "Number of token session cache misses.",
111         })
112         reg.MustRegister(m.sessionMisses)
113 }
114
115 type cachedPDH struct {
116         expire  time.Time
117         refresh time.Time
118         pdh     string
119 }
120
121 type cachedCollection struct {
122         expire     time.Time
123         collection *arvados.Collection
124 }
125
126 type cachedPermission struct {
127         expire time.Time
128 }
129
130 type cachedSession struct {
131         expire        time.Time
132         fs            atomic.Value
133         client        *arvados.Client
134         arvadosclient *arvadosclient.ArvadosClient
135         keepclient    *keepclient.KeepClient
136         user          atomic.Value
137 }
138
139 func (c *cache) setup() {
140         var err error
141         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
142         if err != nil {
143                 panic(err)
144         }
145         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
146         if err != nil {
147                 panic(err)
148         }
149         c.sessions, err = lru.New2Q(c.config.MaxSessions)
150         if err != nil {
151                 panic(err)
152         }
153
154         reg := c.registry
155         if reg == nil {
156                 reg = prometheus.NewRegistry()
157         }
158         c.metrics.setup(reg)
159         go func() {
160                 for range time.Tick(metricsUpdateInterval) {
161                         c.updateGauges()
162                 }
163         }()
164         c.chPruneCollections = make(chan struct{}, 1)
165         go func() {
166                 for range c.chPruneCollections {
167                         c.pruneCollections()
168                 }
169         }()
170         c.chPruneSessions = make(chan struct{}, 1)
171         go func() {
172                 for range c.chPruneSessions {
173                         c.pruneSessions()
174                 }
175         }()
176 }
177
178 func (c *cache) updateGauges() {
179         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
180         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
181         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
182 }
183
184 var selectPDH = map[string]interface{}{
185         "select": []string{"portable_data_hash"},
186 }
187
188 // Update saves a modified version (fs) to an existing collection
189 // (coll) and, if successful, updates the relevant cache entries so
190 // subsequent calls to Get() reflect the modifications.
191 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
192         c.setupOnce.Do(c.setup)
193
194         m, err := fs.MarshalManifest(".")
195         if err != nil || m == coll.ManifestText {
196                 return err
197         }
198         coll.ManifestText = m
199         var updated arvados.Collection
200         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
201                 "collection": map[string]string{
202                         "manifest_text": coll.ManifestText,
203                 },
204         })
205         if err != nil {
206                 c.pdhs.Remove(coll.UUID)
207                 return err
208         }
209         c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
210                 expire:     time.Now().Add(time.Duration(c.config.TTL)),
211                 collection: &updated,
212         })
213         c.pdhs.Add(coll.UUID, &cachedPDH{
214                 expire:  time.Now().Add(time.Duration(c.config.TTL)),
215                 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
216                 pdh:     updated.PortableDataHash,
217         })
218         return nil
219 }
220
221 // ResetSession unloads any potentially stale state. Should be called
222 // after write operations, so subsequent reads don't return stale
223 // data.
224 func (c *cache) ResetSession(token string) {
225         c.setupOnce.Do(c.setup)
226         c.sessions.Remove(token)
227 }
228
229 // Get a long-lived CustomFileSystem suitable for doing a read operation
230 // with the given token.
231 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
232         c.setupOnce.Do(c.setup)
233         now := time.Now()
234         ent, _ := c.sessions.Get(token)
235         sess, _ := ent.(*cachedSession)
236         expired := false
237         if sess == nil {
238                 c.metrics.sessionMisses.Inc()
239                 sess = &cachedSession{
240                         expire: now.Add(c.config.TTL.Duration()),
241                 }
242                 var err error
243                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
244                 if err != nil {
245                         return nil, nil, err
246                 }
247                 sess.client.AuthToken = token
248                 sess.arvadosclient, err = arvadosclient.New(sess.client)
249                 if err != nil {
250                         return nil, nil, err
251                 }
252                 sess.keepclient = keepclient.New(sess.arvadosclient)
253                 c.sessions.Add(token, sess)
254         } else if sess.expire.Before(now) {
255                 c.metrics.sessionMisses.Inc()
256                 expired = true
257         } else {
258                 c.metrics.sessionHits.Inc()
259         }
260         select {
261         case c.chPruneSessions <- struct{}{}:
262         default:
263         }
264         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
265         if fs != nil && !expired {
266                 return fs, sess, nil
267         }
268         fs = sess.client.SiteFileSystem(sess.keepclient)
269         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
270         sess.fs.Store(fs)
271         return fs, sess, nil
272 }
273
274 // Remove all expired session cache entries, then remove more entries
275 // until approximate remaining size <= maxsize/2
276 func (c *cache) pruneSessions() {
277         now := time.Now()
278         var size int64
279         keys := c.sessions.Keys()
280         for _, token := range keys {
281                 ent, ok := c.sessions.Peek(token)
282                 if !ok {
283                         continue
284                 }
285                 s := ent.(*cachedSession)
286                 if s.expire.Before(now) {
287                         c.sessions.Remove(token)
288                         continue
289                 }
290                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
291                         size += fs.MemorySize()
292                 }
293         }
294         // Remove tokens until reaching size limit, starting with the
295         // least frequently used entries (which Keys() returns last).
296         for i := len(keys) - 1; i >= 0; i-- {
297                 token := keys[i]
298                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
299                         break
300                 }
301                 ent, ok := c.sessions.Peek(token)
302                 if !ok {
303                         continue
304                 }
305                 s := ent.(*cachedSession)
306                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
307                 if fs == nil {
308                         continue
309                 }
310                 c.sessions.Remove(token)
311                 size -= fs.MemorySize()
312         }
313 }
314
315 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
316         c.setupOnce.Do(c.setup)
317         c.metrics.requests.Inc()
318
319         var pdhRefresh bool
320         var pdh string
321         if arvadosclient.PDHMatch(targetID) {
322                 pdh = targetID
323         } else if ent, cached := c.pdhs.Get(targetID); cached {
324                 ent := ent.(*cachedPDH)
325                 if ent.expire.Before(time.Now()) {
326                         c.pdhs.Remove(targetID)
327                 } else {
328                         pdh = ent.pdh
329                         pdhRefresh = forceReload || time.Now().After(ent.refresh)
330                         c.metrics.pdhHits.Inc()
331                 }
332         }
333
334         if pdh == "" {
335                 // UUID->PDH mapping is not cached, might as well get
336                 // the whole collection record and be done (below).
337                 c.logger.Debugf("cache(%s): have no pdh", targetID)
338         } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
339                 // PDH->manifest is not cached, might as well get the
340                 // whole collection record (below).
341                 c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
342         } else if !pdhRefresh {
343                 // We looked up UUID->PDH very recently, and we still
344                 // have the manifest for that PDH.
345                 c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
346                 return cached, nil
347         } else {
348                 // Get current PDH for this UUID (and confirm we still
349                 // have read permission).  Most likely, the cached PDH
350                 // is still correct, in which case we can use our
351                 // cached manifest.
352                 c.metrics.apiCalls.Inc()
353                 var current arvados.Collection
354                 err := arv.Get("collections", targetID, selectPDH, &current)
355                 if err != nil {
356                         return nil, err
357                 }
358                 if current.PortableDataHash == pdh {
359                         // PDH has not changed, cached manifest is
360                         // correct.
361                         c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
362                         return cached, nil
363                 }
364                 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
365                         // PDH changed, and we already have the
366                         // manifest for that new PDH.
367                         c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
368                         return cached, nil
369                 }
370         }
371
372         // Either UUID->PDH is not cached, or PDH->manifest is not
373         // cached.
374         var retrieved arvados.Collection
375         c.metrics.apiCalls.Inc()
376         err := arv.Get("collections", targetID, nil, &retrieved)
377         if err != nil {
378                 return nil, err
379         }
380         c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
381         exp := time.Now().Add(time.Duration(c.config.TTL))
382         if targetID != retrieved.PortableDataHash {
383                 c.pdhs.Add(targetID, &cachedPDH{
384                         expire:  exp,
385                         refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
386                         pdh:     retrieved.PortableDataHash,
387                 })
388         }
389         c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
390                 expire:     exp,
391                 collection: &retrieved,
392         })
393         if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
394                 select {
395                 case c.chPruneCollections <- struct{}{}:
396                 default:
397                 }
398         }
399         return &retrieved, nil
400 }
401
402 // pruneCollections checks the total bytes occupied by manifest_text
403 // in the collection cache and removes old entries as needed to bring
404 // the total size down to CollectionBytes. It also deletes all expired
405 // entries.
406 //
407 // pruneCollections does not aim to be perfectly correct when there is
408 // concurrent cache activity.
409 func (c *cache) pruneCollections() {
410         var size int64
411         now := time.Now()
412         keys := c.collections.Keys()
413         entsize := make([]int, len(keys))
414         expired := make([]bool, len(keys))
415         for i, k := range keys {
416                 v, ok := c.collections.Peek(k)
417                 if !ok {
418                         continue
419                 }
420                 ent := v.(*cachedCollection)
421                 n := len(ent.collection.ManifestText)
422                 size += int64(n)
423                 entsize[i] = n
424                 expired[i] = ent.expire.Before(now)
425         }
426         for i, k := range keys {
427                 if expired[i] {
428                         c.collections.Remove(k)
429                         size -= int64(entsize[i])
430                 }
431         }
432         for i, k := range keys {
433                 if size <= c.config.MaxCollectionBytes/2 {
434                         break
435                 }
436                 if expired[i] {
437                         // already removed this entry in the previous loop
438                         continue
439                 }
440                 c.collections.Remove(k)
441                 size -= int64(entsize[i])
442         }
443 }
444
445 // collectionBytes returns the approximate combined memory size of the
446 // collection cache and session filesystem cache.
447 func (c *cache) collectionBytes() uint64 {
448         var size uint64
449         for _, k := range c.collections.Keys() {
450                 v, ok := c.collections.Peek(k)
451                 if !ok {
452                         continue
453                 }
454                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
455         }
456         for _, token := range c.sessions.Keys() {
457                 ent, ok := c.sessions.Peek(token)
458                 if !ok {
459                         continue
460                 }
461                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
462                         size += uint64(fs.MemorySize())
463                 }
464         }
465         return size
466 }
467
468 func (c *cache) lookupCollection(key string) *arvados.Collection {
469         e, cached := c.collections.Get(key)
470         if !cached {
471                 return nil
472         }
473         ent := e.(*cachedCollection)
474         if ent.expire.Before(time.Now()) {
475                 c.collections.Remove(key)
476                 return nil
477         }
478         c.metrics.collectionHits.Inc()
479         return ent.collection
480 }
481
482 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
483         // Get and cache user record associated with this
484         // token.  We need to know their UUID for logging, and
485         // whether they are an admin or not for certain
486         // permission checks.
487
488         // Get/create session entry
489         _, sess, err := c.GetSession(token)
490         if err != nil {
491                 return nil, err
492         }
493
494         // See if the user is already set, and if so, return it
495         user, _ := sess.user.Load().(*arvados.User)
496         if user != nil {
497                 return user, nil
498         }
499
500         // Fetch the user record
501         c.metrics.apiCalls.Inc()
502         var current arvados.User
503
504         err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
505         if err != nil {
506                 return nil, err
507         }
508
509         // Stash the user record for next time
510         sess.user.Store(&current)
511         return &current, nil
512 }