16745: Keep a SiteFileSystem alive for multiple read requests.
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 const metricsUpdateInterval = time.Second / 10
20
21 type cache struct {
22         cluster     *arvados.Cluster
23         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24         registry    *prometheus.Registry
25         metrics     cacheMetrics
26         pdhs        *lru.TwoQueueCache
27         collections *lru.TwoQueueCache
28         permissions *lru.TwoQueueCache
29         sessions    *lru.TwoQueueCache
30         setupOnce   sync.Once
31 }
32
33 type cacheMetrics struct {
34         requests          prometheus.Counter
35         collectionBytes   prometheus.Gauge
36         collectionEntries prometheus.Gauge
37         sessionEntries    prometheus.Gauge
38         collectionHits    prometheus.Counter
39         pdhHits           prometheus.Counter
40         permissionHits    prometheus.Counter
41         sessionHits       prometheus.Counter
42         sessionMisses     prometheus.Counter
43         apiCalls          prometheus.Counter
44 }
45
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
48                 Namespace: "arvados",
49                 Subsystem: "keepweb_collectioncache",
50                 Name:      "requests",
51                 Help:      "Number of targetID-to-manifest lookups handled.",
52         })
53         reg.MustRegister(m.requests)
54         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
55                 Namespace: "arvados",
56                 Subsystem: "keepweb_collectioncache",
57                 Name:      "hits",
58                 Help:      "Number of pdh-to-manifest cache hits.",
59         })
60         reg.MustRegister(m.collectionHits)
61         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
62                 Namespace: "arvados",
63                 Subsystem: "keepweb_collectioncache",
64                 Name:      "pdh_hits",
65                 Help:      "Number of uuid-to-pdh cache hits.",
66         })
67         reg.MustRegister(m.pdhHits)
68         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
69                 Namespace: "arvados",
70                 Subsystem: "keepweb_collectioncache",
71                 Name:      "permission_hits",
72                 Help:      "Number of targetID-to-permission cache hits.",
73         })
74         reg.MustRegister(m.permissionHits)
75         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
76                 Namespace: "arvados",
77                 Subsystem: "keepweb_collectioncache",
78                 Name:      "api_calls",
79                 Help:      "Number of outgoing API calls made by cache.",
80         })
81         reg.MustRegister(m.apiCalls)
82         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
83                 Namespace: "arvados",
84                 Subsystem: "keepweb_collectioncache",
85                 Name:      "cached_manifest_bytes",
86                 Help:      "Total size of all manifests in cache.",
87         })
88         reg.MustRegister(m.collectionBytes)
89         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
90                 Namespace: "arvados",
91                 Subsystem: "keepweb_collectioncache",
92                 Name:      "cached_manifests",
93                 Help:      "Number of manifests in cache.",
94         })
95         reg.MustRegister(m.collectionEntries)
96         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
97                 Namespace: "arvados",
98                 Subsystem: "keepweb_sessions",
99                 Name:      "active",
100                 Help:      "Number of active token sessions.",
101         })
102         reg.MustRegister(m.sessionEntries)
103         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104                 Namespace: "arvados",
105                 Subsystem: "keepweb_sessions",
106                 Name:      "hits",
107                 Help:      "Number of token session cache hits.",
108         })
109         reg.MustRegister(m.sessionHits)
110         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111                 Namespace: "arvados",
112                 Subsystem: "keepweb_sessions",
113                 Name:      "misses",
114                 Help:      "Number of token session cache misses.",
115         })
116         reg.MustRegister(m.sessionMisses)
117 }
118
119 type cachedPDH struct {
120         expire time.Time
121         pdh    string
122 }
123
124 type cachedCollection struct {
125         expire     time.Time
126         collection *arvados.Collection
127 }
128
129 type cachedPermission struct {
130         expire time.Time
131 }
132
133 type cachedSession struct {
134         expire time.Time
135         fs     atomic.Value
136 }
137
138 func (c *cache) setup() {
139         var err error
140         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
141         if err != nil {
142                 panic(err)
143         }
144         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
145         if err != nil {
146                 panic(err)
147         }
148         c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
149         if err != nil {
150                 panic(err)
151         }
152         c.sessions, err = lru.New2Q(c.config.MaxSessions)
153         if err != nil {
154                 panic(err)
155         }
156
157         reg := c.registry
158         if reg == nil {
159                 reg = prometheus.NewRegistry()
160         }
161         c.metrics.setup(reg)
162         go func() {
163                 for range time.Tick(metricsUpdateInterval) {
164                         c.updateGauges()
165                 }
166         }()
167 }
168
169 func (c *cache) updateGauges() {
170         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
171         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
172         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
173 }
174
175 var selectPDH = map[string]interface{}{
176         "select": []string{"portable_data_hash"},
177 }
178
179 // Update saves a modified version (fs) to an existing collection
180 // (coll) and, if successful, updates the relevant cache entries so
181 // subsequent calls to Get() reflect the modifications.
182 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
183         c.setupOnce.Do(c.setup)
184
185         m, err := fs.MarshalManifest(".")
186         if err != nil || m == coll.ManifestText {
187                 return err
188         }
189         coll.ManifestText = m
190         var updated arvados.Collection
191         defer c.pdhs.Remove(coll.UUID)
192         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
193                 "collection": map[string]string{
194                         "manifest_text": coll.ManifestText,
195                 },
196         })
197         if err == nil {
198                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
199                         expire:     time.Now().Add(time.Duration(c.config.TTL)),
200                         collection: &updated,
201                 })
202         }
203         return err
204 }
205
206 // ResetSession unloads any potentially stale state. Should be called
207 // after write operations, so subsequent reads don't return stale
208 // data.
209 func (c *cache) ResetSession(token string) {
210         c.setupOnce.Do(c.setup)
211         c.sessions.Remove(token)
212 }
213
214 // Get a long-lived CustomFileSystem suitable for doing a read operation
215 // with the given token.
216 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
217         c.setupOnce.Do(c.setup)
218         now := time.Now()
219         ent, _ := c.sessions.Get(token)
220         sess, _ := ent.(*cachedSession)
221         if sess == nil {
222                 c.metrics.sessionMisses.Inc()
223                 sess = &cachedSession{
224                         expire: now.Add(c.config.TTL.Duration()),
225                 }
226                 c.sessions.Add(token, sess)
227         } else if sess.expire.Before(now) {
228                 c.metrics.sessionMisses.Inc()
229                 sess.fs.Store(nil)
230         } else {
231                 c.metrics.sessionHits.Inc()
232         }
233         go c.pruneSessions()
234         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
235         if fs != nil {
236                 return fs, nil
237         }
238         ac, err := arvados.NewClientFromConfig(c.cluster)
239         if err != nil {
240                 return nil, err
241         }
242         ac.AuthToken = token
243         arv, err := arvadosclient.New(ac)
244         if err != nil {
245                 return nil, err
246         }
247         kc := keepclient.New(arv)
248         fs = ac.SiteFileSystem(kc)
249         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
250         sess.fs.Store(fs)
251         return fs, nil
252 }
253
254 func (c *cache) pruneSessions() {
255         now := time.Now()
256         var size int64
257         for _, token := range c.sessions.Keys() {
258                 ent, ok := c.sessions.Peek(token)
259                 if !ok {
260                         continue
261                 }
262                 s := ent.(*cachedSession)
263                 if s.expire.Before(now) {
264                         c.sessions.Remove(token)
265                         continue
266                 }
267                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
268                 if fs == nil {
269                         continue
270                 }
271                 size += fs.MemorySize()
272         }
273         if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
274                 for _, token := range c.sessions.Keys() {
275                         c.sessions.Remove(token)
276                 }
277         }
278 }
279
280 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
281         c.setupOnce.Do(c.setup)
282         c.metrics.requests.Inc()
283
284         permOK := false
285         permKey := arv.ApiToken + "\000" + targetID
286         if forceReload {
287         } else if ent, cached := c.permissions.Get(permKey); cached {
288                 ent := ent.(*cachedPermission)
289                 if ent.expire.Before(time.Now()) {
290                         c.permissions.Remove(permKey)
291                 } else {
292                         permOK = true
293                         c.metrics.permissionHits.Inc()
294                 }
295         }
296
297         var pdh string
298         if arvadosclient.PDHMatch(targetID) {
299                 pdh = targetID
300         } else if ent, cached := c.pdhs.Get(targetID); cached {
301                 ent := ent.(*cachedPDH)
302                 if ent.expire.Before(time.Now()) {
303                         c.pdhs.Remove(targetID)
304                 } else {
305                         pdh = ent.pdh
306                         c.metrics.pdhHits.Inc()
307                 }
308         }
309
310         var collection *arvados.Collection
311         if pdh != "" {
312                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
313         }
314
315         if collection != nil && permOK {
316                 return collection, nil
317         } else if collection != nil {
318                 // Ask API for current PDH for this targetID. Most
319                 // likely, the cached PDH is still correct; if so,
320                 // _and_ the current token has permission, we can
321                 // use our cached manifest.
322                 c.metrics.apiCalls.Inc()
323                 var current arvados.Collection
324                 err := arv.Get("collections", targetID, selectPDH, &current)
325                 if err != nil {
326                         return nil, err
327                 }
328                 if current.PortableDataHash == pdh {
329                         c.permissions.Add(permKey, &cachedPermission{
330                                 expire: time.Now().Add(time.Duration(c.config.TTL)),
331                         })
332                         if pdh != targetID {
333                                 c.pdhs.Add(targetID, &cachedPDH{
334                                         expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
335                                         pdh:    pdh,
336                                 })
337                         }
338                         return collection, err
339                 }
340                 // PDH changed, but now we know we have
341                 // permission -- and maybe we already have the
342                 // new PDH in the cache.
343                 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
344                         return coll, nil
345                 }
346         }
347
348         // Collection manifest is not cached.
349         c.metrics.apiCalls.Inc()
350         err := arv.Get("collections", targetID, nil, &collection)
351         if err != nil {
352                 return nil, err
353         }
354         exp := time.Now().Add(time.Duration(c.config.TTL))
355         c.permissions.Add(permKey, &cachedPermission{
356                 expire: exp,
357         })
358         c.pdhs.Add(targetID, &cachedPDH{
359                 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
360                 pdh:    collection.PortableDataHash,
361         })
362         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
363                 expire:     exp,
364                 collection: collection,
365         })
366         if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
367                 go c.pruneCollections()
368         }
369         return collection, nil
370 }
371
372 // pruneCollections checks the total bytes occupied by manifest_text
373 // in the collection cache and removes old entries as needed to bring
374 // the total size down to CollectionBytes. It also deletes all expired
375 // entries.
376 //
377 // pruneCollections does not aim to be perfectly correct when there is
378 // concurrent cache activity.
379 func (c *cache) pruneCollections() {
380         var size int64
381         now := time.Now()
382         keys := c.collections.Keys()
383         entsize := make([]int, len(keys))
384         expired := make([]bool, len(keys))
385         for i, k := range keys {
386                 v, ok := c.collections.Peek(k)
387                 if !ok {
388                         continue
389                 }
390                 ent := v.(*cachedCollection)
391                 n := len(ent.collection.ManifestText)
392                 size += int64(n)
393                 entsize[i] = n
394                 expired[i] = ent.expire.Before(now)
395         }
396         for i, k := range keys {
397                 if expired[i] {
398                         c.collections.Remove(k)
399                         size -= int64(entsize[i])
400                 }
401         }
402         for i, k := range keys {
403                 if size <= c.config.MaxCollectionBytes/2 {
404                         break
405                 }
406                 if expired[i] {
407                         // already removed this entry in the previous loop
408                         continue
409                 }
410                 c.collections.Remove(k)
411                 size -= int64(entsize[i])
412         }
413 }
414
415 // collectionBytes returns the approximate combined memory size of the
416 // collection cache and session filesystem cache.
417 func (c *cache) collectionBytes() uint64 {
418         var size uint64
419         for _, k := range c.collections.Keys() {
420                 v, ok := c.collections.Peek(k)
421                 if !ok {
422                         continue
423                 }
424                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
425         }
426         for _, token := range c.sessions.Keys() {
427                 ent, ok := c.sessions.Peek(token)
428                 if !ok {
429                         continue
430                 }
431                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
432                         size += uint64(fs.MemorySize())
433                 }
434         }
435         return size
436 }
437
438 func (c *cache) lookupCollection(key string) *arvados.Collection {
439         e, cached := c.collections.Get(key)
440         if !cached {
441                 return nil
442         }
443         ent := e.(*cachedCollection)
444         if ent.expire.Before(time.Now()) {
445                 c.collections.Remove(key)
446                 return nil
447         }
448         c.metrics.collectionHits.Inc()
449         return ent.collection
450 }