18051: Add logging capability to webdav cache.
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 const metricsUpdateInterval = time.Second / 10
21
22 type cache struct {
23         cluster     *arvados.Cluster
24         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
25         logger      logrus.FieldLogger
26         registry    *prometheus.Registry
27         metrics     cacheMetrics
28         pdhs        *lru.TwoQueueCache
29         collections *lru.TwoQueueCache
30         sessions    *lru.TwoQueueCache
31         setupOnce   sync.Once
32
33         chPruneSessions    chan struct{}
34         chPruneCollections chan struct{}
35 }
36
37 type cacheMetrics struct {
38         requests          prometheus.Counter
39         collectionBytes   prometheus.Gauge
40         collectionEntries prometheus.Gauge
41         sessionEntries    prometheus.Gauge
42         collectionHits    prometheus.Counter
43         pdhHits           prometheus.Counter
44         sessionHits       prometheus.Counter
45         sessionMisses     prometheus.Counter
46         apiCalls          prometheus.Counter
47 }
48
49 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
50         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
51                 Namespace: "arvados",
52                 Subsystem: "keepweb_collectioncache",
53                 Name:      "requests",
54                 Help:      "Number of targetID-to-manifest lookups handled.",
55         })
56         reg.MustRegister(m.requests)
57         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
58                 Namespace: "arvados",
59                 Subsystem: "keepweb_collectioncache",
60                 Name:      "hits",
61                 Help:      "Number of pdh-to-manifest cache hits.",
62         })
63         reg.MustRegister(m.collectionHits)
64         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
65                 Namespace: "arvados",
66                 Subsystem: "keepweb_collectioncache",
67                 Name:      "pdh_hits",
68                 Help:      "Number of uuid-to-pdh cache hits.",
69         })
70         reg.MustRegister(m.pdhHits)
71         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
72                 Namespace: "arvados",
73                 Subsystem: "keepweb_collectioncache",
74                 Name:      "api_calls",
75                 Help:      "Number of outgoing API calls made by cache.",
76         })
77         reg.MustRegister(m.apiCalls)
78         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
79                 Namespace: "arvados",
80                 Subsystem: "keepweb_sessions",
81                 Name:      "cached_collection_bytes",
82                 Help:      "Total size of all cached manifests and sessions.",
83         })
84         reg.MustRegister(m.collectionBytes)
85         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
86                 Namespace: "arvados",
87                 Subsystem: "keepweb_collectioncache",
88                 Name:      "cached_manifests",
89                 Help:      "Number of manifests in cache.",
90         })
91         reg.MustRegister(m.collectionEntries)
92         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
93                 Namespace: "arvados",
94                 Subsystem: "keepweb_sessions",
95                 Name:      "active",
96                 Help:      "Number of active token sessions.",
97         })
98         reg.MustRegister(m.sessionEntries)
99         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
100                 Namespace: "arvados",
101                 Subsystem: "keepweb_sessions",
102                 Name:      "hits",
103                 Help:      "Number of token session cache hits.",
104         })
105         reg.MustRegister(m.sessionHits)
106         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
107                 Namespace: "arvados",
108                 Subsystem: "keepweb_sessions",
109                 Name:      "misses",
110                 Help:      "Number of token session cache misses.",
111         })
112         reg.MustRegister(m.sessionMisses)
113 }
114
115 type cachedPDH struct {
116         expire  time.Time
117         refresh time.Time
118         pdh     string
119 }
120
121 type cachedCollection struct {
122         expire     time.Time
123         collection *arvados.Collection
124 }
125
126 type cachedPermission struct {
127         expire time.Time
128 }
129
130 type cachedSession struct {
131         expire        time.Time
132         fs            atomic.Value
133         client        *arvados.Client
134         arvadosclient *arvadosclient.ArvadosClient
135         keepclient    *keepclient.KeepClient
136         user          atomic.Value
137 }
138
139 func (c *cache) setup() {
140         var err error
141         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
142         if err != nil {
143                 panic(err)
144         }
145         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
146         if err != nil {
147                 panic(err)
148         }
149         c.sessions, err = lru.New2Q(c.config.MaxSessions)
150         if err != nil {
151                 panic(err)
152         }
153
154         reg := c.registry
155         if reg == nil {
156                 reg = prometheus.NewRegistry()
157         }
158         c.metrics.setup(reg)
159         go func() {
160                 for range time.Tick(metricsUpdateInterval) {
161                         c.updateGauges()
162                 }
163         }()
164         c.chPruneCollections = make(chan struct{}, 1)
165         go func() {
166                 for range c.chPruneCollections {
167                         c.pruneCollections()
168                 }
169         }()
170         c.chPruneSessions = make(chan struct{}, 1)
171         go func() {
172                 for range c.chPruneSessions {
173                         c.pruneSessions()
174                 }
175         }()
176 }
177
178 func (c *cache) updateGauges() {
179         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
180         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
181         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
182 }
183
184 var selectPDH = map[string]interface{}{
185         "select": []string{"portable_data_hash"},
186 }
187
188 // Update saves a modified version (fs) to an existing collection
189 // (coll) and, if successful, updates the relevant cache entries so
190 // subsequent calls to Get() reflect the modifications.
191 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
192         c.setupOnce.Do(c.setup)
193
194         m, err := fs.MarshalManifest(".")
195         if err != nil || m == coll.ManifestText {
196                 return err
197         }
198         coll.ManifestText = m
199         var updated arvados.Collection
200         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
201                 "collection": map[string]string{
202                         "manifest_text": coll.ManifestText,
203                 },
204         })
205         if err != nil {
206                 c.pdhs.Remove(coll.UUID)
207                 return err
208         }
209         c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
210                 expire:     time.Now().Add(time.Duration(c.config.TTL)),
211                 collection: &updated,
212         })
213         c.pdhs.Add(coll.UUID, &cachedPDH{
214                 expire:  time.Now().Add(time.Duration(c.config.TTL)),
215                 refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
216                 pdh:     updated.PortableDataHash,
217         })
218         return nil
219 }
220
221 // ResetSession unloads any potentially stale state. Should be called
222 // after write operations, so subsequent reads don't return stale
223 // data.
224 func (c *cache) ResetSession(token string) {
225         c.setupOnce.Do(c.setup)
226         c.sessions.Remove(token)
227 }
228
229 // Get a long-lived CustomFileSystem suitable for doing a read operation
230 // with the given token.
231 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
232         c.setupOnce.Do(c.setup)
233         now := time.Now()
234         ent, _ := c.sessions.Get(token)
235         sess, _ := ent.(*cachedSession)
236         expired := false
237         if sess == nil {
238                 c.metrics.sessionMisses.Inc()
239                 sess = &cachedSession{
240                         expire: now.Add(c.config.TTL.Duration()),
241                 }
242                 var err error
243                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
244                 if err != nil {
245                         return nil, nil, err
246                 }
247                 sess.client.AuthToken = token
248                 sess.arvadosclient, err = arvadosclient.New(sess.client)
249                 if err != nil {
250                         return nil, nil, err
251                 }
252                 sess.keepclient = keepclient.New(sess.arvadosclient)
253                 c.sessions.Add(token, sess)
254         } else if sess.expire.Before(now) {
255                 c.metrics.sessionMisses.Inc()
256                 expired = true
257         } else {
258                 c.metrics.sessionHits.Inc()
259         }
260         select {
261         case c.chPruneSessions <- struct{}{}:
262         default:
263         }
264         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
265         if fs != nil && !expired {
266                 return fs, sess, nil
267         }
268         fs = sess.client.SiteFileSystem(sess.keepclient)
269         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
270         sess.fs.Store(fs)
271         return fs, sess, nil
272 }
273
274 // Remove all expired session cache entries, then remove more entries
275 // until approximate remaining size <= maxsize/2
276 func (c *cache) pruneSessions() {
277         now := time.Now()
278         var size int64
279         keys := c.sessions.Keys()
280         for _, token := range keys {
281                 ent, ok := c.sessions.Peek(token)
282                 if !ok {
283                         continue
284                 }
285                 s := ent.(*cachedSession)
286                 if s.expire.Before(now) {
287                         c.sessions.Remove(token)
288                         continue
289                 }
290                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
291                         size += fs.MemorySize()
292                 }
293         }
294         // Remove tokens until reaching size limit, starting with the
295         // least frequently used entries (which Keys() returns last).
296         for i := len(keys) - 1; i >= 0; i-- {
297                 token := keys[i]
298                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
299                         break
300                 }
301                 ent, ok := c.sessions.Peek(token)
302                 if !ok {
303                         continue
304                 }
305                 s := ent.(*cachedSession)
306                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
307                 if fs == nil {
308                         continue
309                 }
310                 c.sessions.Remove(token)
311                 size -= fs.MemorySize()
312         }
313 }
314
315 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
316         c.setupOnce.Do(c.setup)
317         c.metrics.requests.Inc()
318
319         var pdhRefresh bool
320         var pdh string
321         if arvadosclient.PDHMatch(targetID) {
322                 pdh = targetID
323         } else if ent, cached := c.pdhs.Get(targetID); cached {
324                 ent := ent.(*cachedPDH)
325                 if ent.expire.Before(time.Now()) {
326                         c.pdhs.Remove(targetID)
327                 } else {
328                         pdh = ent.pdh
329                         pdhRefresh = forceReload || time.Now().After(ent.refresh)
330                         c.metrics.pdhHits.Inc()
331                 }
332         }
333
334         if pdh == "" {
335                 // UUID->PDH mapping is not cached, might as well get
336                 // the whole collection record and be done (below).
337         } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
338                 // PDH->manifest is not cached, might as well get the
339                 // whole collection record (below).
340         } else if !pdhRefresh {
341                 // We looked up UUID->PDH very recently, and we still
342                 // have the manifest for that PDH.
343                 return cached, nil
344         } else {
345                 // Get current PDH for this UUID (and confirm we still
346                 // have read permission).  Most likely, the cached PDH
347                 // is still correct, in which case we can use our
348                 // cached manifest.
349                 c.metrics.apiCalls.Inc()
350                 var current arvados.Collection
351                 err := arv.Get("collections", targetID, selectPDH, &current)
352                 if err != nil {
353                         return nil, err
354                 }
355                 if current.PortableDataHash == pdh {
356                         // PDH has not changed, cached manifest is
357                         // correct.
358                         return cached, err
359                 }
360                 if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
361                         // PDH changed, and we already have the
362                         // manifest for that new PDH.
363                         return cached, nil
364                 }
365         }
366
367         // Either UUID->PDH is not cached, or PDH->manifest is not
368         // cached.
369         var retrieved arvados.Collection
370         c.metrics.apiCalls.Inc()
371         err := arv.Get("collections", targetID, nil, &retrieved)
372         if err != nil {
373                 return nil, err
374         }
375         exp := time.Now().Add(time.Duration(c.config.TTL))
376         if targetID != retrieved.PortableDataHash {
377                 c.pdhs.Add(targetID, &cachedPDH{
378                         expire:  exp,
379                         refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
380                         pdh:     retrieved.PortableDataHash,
381                 })
382         }
383         c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
384                 expire:     exp,
385                 collection: &retrieved,
386         })
387         if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
388                 select {
389                 case c.chPruneCollections <- struct{}{}:
390                 default:
391                 }
392         }
393         return &retrieved, nil
394 }
395
396 // pruneCollections checks the total bytes occupied by manifest_text
397 // in the collection cache and removes old entries as needed to bring
398 // the total size down to CollectionBytes. It also deletes all expired
399 // entries.
400 //
401 // pruneCollections does not aim to be perfectly correct when there is
402 // concurrent cache activity.
403 func (c *cache) pruneCollections() {
404         var size int64
405         now := time.Now()
406         keys := c.collections.Keys()
407         entsize := make([]int, len(keys))
408         expired := make([]bool, len(keys))
409         for i, k := range keys {
410                 v, ok := c.collections.Peek(k)
411                 if !ok {
412                         continue
413                 }
414                 ent := v.(*cachedCollection)
415                 n := len(ent.collection.ManifestText)
416                 size += int64(n)
417                 entsize[i] = n
418                 expired[i] = ent.expire.Before(now)
419         }
420         for i, k := range keys {
421                 if expired[i] {
422                         c.collections.Remove(k)
423                         size -= int64(entsize[i])
424                 }
425         }
426         for i, k := range keys {
427                 if size <= c.config.MaxCollectionBytes/2 {
428                         break
429                 }
430                 if expired[i] {
431                         // already removed this entry in the previous loop
432                         continue
433                 }
434                 c.collections.Remove(k)
435                 size -= int64(entsize[i])
436         }
437 }
438
439 // collectionBytes returns the approximate combined memory size of the
440 // collection cache and session filesystem cache.
441 func (c *cache) collectionBytes() uint64 {
442         var size uint64
443         for _, k := range c.collections.Keys() {
444                 v, ok := c.collections.Peek(k)
445                 if !ok {
446                         continue
447                 }
448                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
449         }
450         for _, token := range c.sessions.Keys() {
451                 ent, ok := c.sessions.Peek(token)
452                 if !ok {
453                         continue
454                 }
455                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
456                         size += uint64(fs.MemorySize())
457                 }
458         }
459         return size
460 }
461
462 func (c *cache) lookupCollection(key string) *arvados.Collection {
463         e, cached := c.collections.Get(key)
464         if !cached {
465                 return nil
466         }
467         ent := e.(*cachedCollection)
468         if ent.expire.Before(time.Now()) {
469                 c.collections.Remove(key)
470                 return nil
471         }
472         c.metrics.collectionHits.Inc()
473         return ent.collection
474 }
475
476 func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
477         // Get and cache user record associated with this
478         // token.  We need to know their UUID for logging, and
479         // whether they are an admin or not for certain
480         // permission checks.
481
482         // Get/create session entry
483         _, sess, err := c.GetSession(token)
484         if err != nil {
485                 return nil, err
486         }
487
488         // See if the user is already set, and if so, return it
489         user, _ := sess.user.Load().(*arvados.User)
490         if user != nil {
491                 return user, nil
492         }
493
494         // Fetch the user record
495         c.metrics.apiCalls.Inc()
496         var current arvados.User
497
498         err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
499         if err != nil {
500                 return nil, err
501         }
502
503         // Stash the user record for next time
504         sess.user.Store(&current)
505         return &current, nil
506 }