17706: Merge branch 'master' into 17706-costanalyzer-uncommitted-container-requests
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15         lru "github.com/hashicorp/golang-lru"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 const metricsUpdateInterval = time.Second / 10
20
21 type cache struct {
22         cluster     *arvados.Cluster
23         config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
24         registry    *prometheus.Registry
25         metrics     cacheMetrics
26         pdhs        *lru.TwoQueueCache
27         collections *lru.TwoQueueCache
28         permissions *lru.TwoQueueCache
29         sessions    *lru.TwoQueueCache
30         setupOnce   sync.Once
31 }
32
33 type cacheMetrics struct {
34         requests          prometheus.Counter
35         collectionBytes   prometheus.Gauge
36         collectionEntries prometheus.Gauge
37         sessionEntries    prometheus.Gauge
38         collectionHits    prometheus.Counter
39         pdhHits           prometheus.Counter
40         permissionHits    prometheus.Counter
41         sessionHits       prometheus.Counter
42         sessionMisses     prometheus.Counter
43         apiCalls          prometheus.Counter
44 }
45
46 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
47         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
48                 Namespace: "arvados",
49                 Subsystem: "keepweb_collectioncache",
50                 Name:      "requests",
51                 Help:      "Number of targetID-to-manifest lookups handled.",
52         })
53         reg.MustRegister(m.requests)
54         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
55                 Namespace: "arvados",
56                 Subsystem: "keepweb_collectioncache",
57                 Name:      "hits",
58                 Help:      "Number of pdh-to-manifest cache hits.",
59         })
60         reg.MustRegister(m.collectionHits)
61         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
62                 Namespace: "arvados",
63                 Subsystem: "keepweb_collectioncache",
64                 Name:      "pdh_hits",
65                 Help:      "Number of uuid-to-pdh cache hits.",
66         })
67         reg.MustRegister(m.pdhHits)
68         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
69                 Namespace: "arvados",
70                 Subsystem: "keepweb_collectioncache",
71                 Name:      "permission_hits",
72                 Help:      "Number of targetID-to-permission cache hits.",
73         })
74         reg.MustRegister(m.permissionHits)
75         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
76                 Namespace: "arvados",
77                 Subsystem: "keepweb_collectioncache",
78                 Name:      "api_calls",
79                 Help:      "Number of outgoing API calls made by cache.",
80         })
81         reg.MustRegister(m.apiCalls)
82         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
83                 Namespace: "arvados",
84                 Subsystem: "keepweb_sessions",
85                 Name:      "cached_collection_bytes",
86                 Help:      "Total size of all cached manifests and sessions.",
87         })
88         reg.MustRegister(m.collectionBytes)
89         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
90                 Namespace: "arvados",
91                 Subsystem: "keepweb_collectioncache",
92                 Name:      "cached_manifests",
93                 Help:      "Number of manifests in cache.",
94         })
95         reg.MustRegister(m.collectionEntries)
96         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
97                 Namespace: "arvados",
98                 Subsystem: "keepweb_sessions",
99                 Name:      "active",
100                 Help:      "Number of active token sessions.",
101         })
102         reg.MustRegister(m.sessionEntries)
103         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
104                 Namespace: "arvados",
105                 Subsystem: "keepweb_sessions",
106                 Name:      "hits",
107                 Help:      "Number of token session cache hits.",
108         })
109         reg.MustRegister(m.sessionHits)
110         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
111                 Namespace: "arvados",
112                 Subsystem: "keepweb_sessions",
113                 Name:      "misses",
114                 Help:      "Number of token session cache misses.",
115         })
116         reg.MustRegister(m.sessionMisses)
117 }
118
119 type cachedPDH struct {
120         expire time.Time
121         pdh    string
122 }
123
124 type cachedCollection struct {
125         expire     time.Time
126         collection *arvados.Collection
127 }
128
129 type cachedPermission struct {
130         expire time.Time
131 }
132
133 type cachedSession struct {
134         expire time.Time
135         fs     atomic.Value
136 }
137
138 func (c *cache) setup() {
139         var err error
140         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
141         if err != nil {
142                 panic(err)
143         }
144         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
145         if err != nil {
146                 panic(err)
147         }
148         c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
149         if err != nil {
150                 panic(err)
151         }
152         c.sessions, err = lru.New2Q(c.config.MaxSessions)
153         if err != nil {
154                 panic(err)
155         }
156
157         reg := c.registry
158         if reg == nil {
159                 reg = prometheus.NewRegistry()
160         }
161         c.metrics.setup(reg)
162         go func() {
163                 for range time.Tick(metricsUpdateInterval) {
164                         c.updateGauges()
165                 }
166         }()
167 }
168
169 func (c *cache) updateGauges() {
170         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
171         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
172         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
173 }
174
175 var selectPDH = map[string]interface{}{
176         "select": []string{"portable_data_hash"},
177 }
178
179 // Update saves a modified version (fs) to an existing collection
180 // (coll) and, if successful, updates the relevant cache entries so
181 // subsequent calls to Get() reflect the modifications.
182 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
183         c.setupOnce.Do(c.setup)
184
185         m, err := fs.MarshalManifest(".")
186         if err != nil || m == coll.ManifestText {
187                 return err
188         }
189         coll.ManifestText = m
190         var updated arvados.Collection
191         defer c.pdhs.Remove(coll.UUID)
192         err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
193                 "collection": map[string]string{
194                         "manifest_text": coll.ManifestText,
195                 },
196         })
197         if err == nil {
198                 c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
199                         expire:     time.Now().Add(time.Duration(c.config.TTL)),
200                         collection: &updated,
201                 })
202         }
203         return err
204 }
205
206 // ResetSession unloads any potentially stale state. Should be called
207 // after write operations, so subsequent reads don't return stale
208 // data.
209 func (c *cache) ResetSession(token string) {
210         c.setupOnce.Do(c.setup)
211         c.sessions.Remove(token)
212 }
213
214 // Get a long-lived CustomFileSystem suitable for doing a read operation
215 // with the given token.
216 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
217         c.setupOnce.Do(c.setup)
218         now := time.Now()
219         ent, _ := c.sessions.Get(token)
220         sess, _ := ent.(*cachedSession)
221         expired := false
222         if sess == nil {
223                 c.metrics.sessionMisses.Inc()
224                 sess = &cachedSession{
225                         expire: now.Add(c.config.TTL.Duration()),
226                 }
227                 c.sessions.Add(token, sess)
228         } else if sess.expire.Before(now) {
229                 c.metrics.sessionMisses.Inc()
230                 expired = true
231         } else {
232                 c.metrics.sessionHits.Inc()
233         }
234         go c.pruneSessions()
235         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
236         if fs != nil && !expired {
237                 return fs, nil
238         }
239         ac, err := arvados.NewClientFromConfig(c.cluster)
240         if err != nil {
241                 return nil, err
242         }
243         ac.AuthToken = token
244         arv, err := arvadosclient.New(ac)
245         if err != nil {
246                 return nil, err
247         }
248         kc := keepclient.New(arv)
249         fs = ac.SiteFileSystem(kc)
250         fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
251         sess.fs.Store(fs)
252         return fs, nil
253 }
254
255 // Remove all expired session cache entries, then remove more entries
256 // until approximate remaining size <= maxsize/2
257 func (c *cache) pruneSessions() {
258         now := time.Now()
259         var size int64
260         keys := c.sessions.Keys()
261         for _, token := range keys {
262                 ent, ok := c.sessions.Peek(token)
263                 if !ok {
264                         continue
265                 }
266                 s := ent.(*cachedSession)
267                 if s.expire.Before(now) {
268                         c.sessions.Remove(token)
269                         continue
270                 }
271                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
272                         size += fs.MemorySize()
273                 }
274         }
275         // Remove tokens until reaching size limit, starting with the
276         // least frequently used entries (which Keys() returns last).
277         for i := len(keys) - 1; i >= 0; i-- {
278                 token := keys[i]
279                 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
280                         break
281                 }
282                 ent, ok := c.sessions.Peek(token)
283                 if !ok {
284                         continue
285                 }
286                 s := ent.(*cachedSession)
287                 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
288                 if fs == nil {
289                         continue
290                 }
291                 c.sessions.Remove(token)
292                 size -= fs.MemorySize()
293         }
294 }
295
296 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
297         c.setupOnce.Do(c.setup)
298         c.metrics.requests.Inc()
299
300         permOK := false
301         permKey := arv.ApiToken + "\000" + targetID
302         if forceReload {
303         } else if ent, cached := c.permissions.Get(permKey); cached {
304                 ent := ent.(*cachedPermission)
305                 if ent.expire.Before(time.Now()) {
306                         c.permissions.Remove(permKey)
307                 } else {
308                         permOK = true
309                         c.metrics.permissionHits.Inc()
310                 }
311         }
312
313         var pdh string
314         if arvadosclient.PDHMatch(targetID) {
315                 pdh = targetID
316         } else if ent, cached := c.pdhs.Get(targetID); cached {
317                 ent := ent.(*cachedPDH)
318                 if ent.expire.Before(time.Now()) {
319                         c.pdhs.Remove(targetID)
320                 } else {
321                         pdh = ent.pdh
322                         c.metrics.pdhHits.Inc()
323                 }
324         }
325
326         var collection *arvados.Collection
327         if pdh != "" {
328                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
329         }
330
331         if collection != nil && permOK {
332                 return collection, nil
333         } else if collection != nil {
334                 // Ask API for current PDH for this targetID. Most
335                 // likely, the cached PDH is still correct; if so,
336                 // _and_ the current token has permission, we can
337                 // use our cached manifest.
338                 c.metrics.apiCalls.Inc()
339                 var current arvados.Collection
340                 err := arv.Get("collections", targetID, selectPDH, &current)
341                 if err != nil {
342                         return nil, err
343                 }
344                 if current.PortableDataHash == pdh {
345                         c.permissions.Add(permKey, &cachedPermission{
346                                 expire: time.Now().Add(time.Duration(c.config.TTL)),
347                         })
348                         if pdh != targetID {
349                                 c.pdhs.Add(targetID, &cachedPDH{
350                                         expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
351                                         pdh:    pdh,
352                                 })
353                         }
354                         return collection, err
355                 }
356                 // PDH changed, but now we know we have
357                 // permission -- and maybe we already have the
358                 // new PDH in the cache.
359                 if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
360                         return coll, nil
361                 }
362         }
363
364         // Collection manifest is not cached.
365         c.metrics.apiCalls.Inc()
366         err := arv.Get("collections", targetID, nil, &collection)
367         if err != nil {
368                 return nil, err
369         }
370         exp := time.Now().Add(time.Duration(c.config.TTL))
371         c.permissions.Add(permKey, &cachedPermission{
372                 expire: exp,
373         })
374         c.pdhs.Add(targetID, &cachedPDH{
375                 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
376                 pdh:    collection.PortableDataHash,
377         })
378         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
379                 expire:     exp,
380                 collection: collection,
381         })
382         if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
383                 go c.pruneCollections()
384         }
385         return collection, nil
386 }
387
388 // pruneCollections checks the total bytes occupied by manifest_text
389 // in the collection cache and removes old entries as needed to bring
390 // the total size down to CollectionBytes. It also deletes all expired
391 // entries.
392 //
393 // pruneCollections does not aim to be perfectly correct when there is
394 // concurrent cache activity.
395 func (c *cache) pruneCollections() {
396         var size int64
397         now := time.Now()
398         keys := c.collections.Keys()
399         entsize := make([]int, len(keys))
400         expired := make([]bool, len(keys))
401         for i, k := range keys {
402                 v, ok := c.collections.Peek(k)
403                 if !ok {
404                         continue
405                 }
406                 ent := v.(*cachedCollection)
407                 n := len(ent.collection.ManifestText)
408                 size += int64(n)
409                 entsize[i] = n
410                 expired[i] = ent.expire.Before(now)
411         }
412         for i, k := range keys {
413                 if expired[i] {
414                         c.collections.Remove(k)
415                         size -= int64(entsize[i])
416                 }
417         }
418         for i, k := range keys {
419                 if size <= c.config.MaxCollectionBytes/2 {
420                         break
421                 }
422                 if expired[i] {
423                         // already removed this entry in the previous loop
424                         continue
425                 }
426                 c.collections.Remove(k)
427                 size -= int64(entsize[i])
428         }
429 }
430
431 // collectionBytes returns the approximate combined memory size of the
432 // collection cache and session filesystem cache.
433 func (c *cache) collectionBytes() uint64 {
434         var size uint64
435         for _, k := range c.collections.Keys() {
436                 v, ok := c.collections.Peek(k)
437                 if !ok {
438                         continue
439                 }
440                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
441         }
442         for _, token := range c.sessions.Keys() {
443                 ent, ok := c.sessions.Peek(token)
444                 if !ok {
445                         continue
446                 }
447                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
448                         size += uint64(fs.MemorySize())
449                 }
450         }
451         return size
452 }
453
454 func (c *cache) lookupCollection(key string) *arvados.Collection {
455         e, cached := c.collections.Get(key)
456         if !cached {
457                 return nil
458         }
459         ent := e.(*cachedCollection)
460         if ent.expire.Before(time.Now()) {
461                 c.collections.Remove(key)
462                 return nil
463         }
464         c.metrics.collectionHits.Inc()
465         return ent.collection
466 }