Allow audit logging to be disabled.
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
14         "github.com/hashicorp/golang-lru"
15         "github.com/prometheus/client_golang/prometheus"
16 )
17
18 const metricsUpdateInterval = time.Second / 10
19
20 type cache struct {
21         TTL                  arvados.Duration
22         UUIDTTL              arvados.Duration
23         MaxCollectionEntries int
24         MaxCollectionBytes   int64
25         MaxPermissionEntries int
26         MaxUUIDEntries       int
27
28         registry    *prometheus.Registry
29         stats       cacheStats
30         metrics     cacheMetrics
31         pdhs        *lru.TwoQueueCache
32         collections *lru.TwoQueueCache
33         permissions *lru.TwoQueueCache
34         setupOnce   sync.Once
35 }
36
37 // cacheStats is EOL - add new metrics to cacheMetrics instead
38 type cacheStats struct {
39         Requests          uint64 `json:"Cache.Requests"`
40         CollectionBytes   uint64 `json:"Cache.CollectionBytes"`
41         CollectionEntries int    `json:"Cache.CollectionEntries"`
42         CollectionHits    uint64 `json:"Cache.CollectionHits"`
43         PDHHits           uint64 `json:"Cache.UUIDHits"`
44         PermissionHits    uint64 `json:"Cache.PermissionHits"`
45         APICalls          uint64 `json:"Cache.APICalls"`
46 }
47
48 type cacheMetrics struct {
49         requests          prometheus.Counter
50         collectionBytes   prometheus.Gauge
51         collectionEntries prometheus.Gauge
52         collectionHits    prometheus.Counter
53         pdhHits           prometheus.Counter
54         permissionHits    prometheus.Counter
55         apiCalls          prometheus.Counter
56 }
57
58 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
59         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
60                 Namespace: "arvados",
61                 Subsystem: "keepweb_collectioncache",
62                 Name:      "requests",
63                 Help:      "Number of targetID-to-manifest lookups handled.",
64         })
65         reg.MustRegister(m.requests)
66         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
67                 Namespace: "arvados",
68                 Subsystem: "keepweb_collectioncache",
69                 Name:      "hits",
70                 Help:      "Number of pdh-to-manifest cache hits.",
71         })
72         reg.MustRegister(m.collectionHits)
73         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
74                 Namespace: "arvados",
75                 Subsystem: "keepweb_collectioncache",
76                 Name:      "pdh_hits",
77                 Help:      "Number of uuid-to-pdh cache hits.",
78         })
79         reg.MustRegister(m.pdhHits)
80         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
81                 Namespace: "arvados",
82                 Subsystem: "keepweb_collectioncache",
83                 Name:      "permission_hits",
84                 Help:      "Number of targetID-to-permission cache hits.",
85         })
86         reg.MustRegister(m.permissionHits)
87         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
88                 Namespace: "arvados",
89                 Subsystem: "keepweb_collectioncache",
90                 Name:      "api_calls",
91                 Help:      "Number of outgoing API calls made by cache.",
92         })
93         reg.MustRegister(m.apiCalls)
94         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
95                 Namespace: "arvados",
96                 Subsystem: "keepweb_collectioncache",
97                 Name:      "cached_manifest_bytes",
98                 Help:      "Total size of all manifests in cache.",
99         })
100         reg.MustRegister(m.collectionBytes)
101         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
102                 Namespace: "arvados",
103                 Subsystem: "keepweb_collectioncache",
104                 Name:      "cached_manifests",
105                 Help:      "Number of manifests in cache.",
106         })
107         reg.MustRegister(m.collectionEntries)
108 }
109
110 type cachedPDH struct {
111         expire time.Time
112         pdh    string
113 }
114
115 type cachedCollection struct {
116         expire     time.Time
117         collection *arvados.Collection
118 }
119
120 type cachedPermission struct {
121         expire time.Time
122 }
123
124 func (c *cache) setup() {
125         var err error
126         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
127         if err != nil {
128                 panic(err)
129         }
130         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
131         if err != nil {
132                 panic(err)
133         }
134         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
135         if err != nil {
136                 panic(err)
137         }
138
139         reg := c.registry
140         if reg == nil {
141                 reg = prometheus.NewRegistry()
142         }
143         c.metrics.setup(reg)
144         go func() {
145                 for range time.Tick(metricsUpdateInterval) {
146                         c.updateGauges()
147                 }
148         }()
149 }
150
151 func (c *cache) updateGauges() {
152         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
153         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
154 }
155
156 var selectPDH = map[string]interface{}{
157         "select": []string{"portable_data_hash"},
158 }
159
160 func (c *cache) Stats() cacheStats {
161         c.setupOnce.Do(c.setup)
162         return cacheStats{
163                 Requests:          atomic.LoadUint64(&c.stats.Requests),
164                 CollectionBytes:   c.collectionBytes(),
165                 CollectionEntries: c.collections.Len(),
166                 CollectionHits:    atomic.LoadUint64(&c.stats.CollectionHits),
167                 PDHHits:           atomic.LoadUint64(&c.stats.PDHHits),
168                 PermissionHits:    atomic.LoadUint64(&c.stats.PermissionHits),
169                 APICalls:          atomic.LoadUint64(&c.stats.APICalls),
170         }
171 }
172
173 // Update saves a modified version (fs) to an existing collection
174 // (coll) and, if successful, updates the relevant cache entries so
175 // subsequent calls to Get() reflect the modifications.
176 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
177         c.setupOnce.Do(c.setup)
178
179         if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
180                 return err
181         } else {
182                 coll.ManifestText = m
183         }
184         var updated arvados.Collection
185         defer c.pdhs.Remove(coll.UUID)
186         err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
187         if err == nil {
188                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
189                         expire:     time.Now().Add(time.Duration(c.TTL)),
190                         collection: &updated,
191                 })
192         }
193         return err
194 }
195
196 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
197         c.setupOnce.Do(c.setup)
198
199         atomic.AddUint64(&c.stats.Requests, 1)
200         c.metrics.requests.Inc()
201
202         permOK := false
203         permKey := arv.ApiToken + "\000" + targetID
204         if forceReload {
205         } else if ent, cached := c.permissions.Get(permKey); cached {
206                 ent := ent.(*cachedPermission)
207                 if ent.expire.Before(time.Now()) {
208                         c.permissions.Remove(permKey)
209                 } else {
210                         permOK = true
211                         atomic.AddUint64(&c.stats.PermissionHits, 1)
212                         c.metrics.permissionHits.Inc()
213                 }
214         }
215
216         var pdh string
217         if arvadosclient.PDHMatch(targetID) {
218                 pdh = targetID
219         } else if ent, cached := c.pdhs.Get(targetID); cached {
220                 ent := ent.(*cachedPDH)
221                 if ent.expire.Before(time.Now()) {
222                         c.pdhs.Remove(targetID)
223                 } else {
224                         pdh = ent.pdh
225                         atomic.AddUint64(&c.stats.PDHHits, 1)
226                         c.metrics.pdhHits.Inc()
227                 }
228         }
229
230         var collection *arvados.Collection
231         if pdh != "" {
232                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
233         }
234
235         if collection != nil && permOK {
236                 return collection, nil
237         } else if collection != nil {
238                 // Ask API for current PDH for this targetID. Most
239                 // likely, the cached PDH is still correct; if so,
240                 // _and_ the current token has permission, we can
241                 // use our cached manifest.
242                 atomic.AddUint64(&c.stats.APICalls, 1)
243                 c.metrics.apiCalls.Inc()
244                 var current arvados.Collection
245                 err := arv.Get("collections", targetID, selectPDH, &current)
246                 if err != nil {
247                         return nil, err
248                 }
249                 if current.PortableDataHash == pdh {
250                         c.permissions.Add(permKey, &cachedPermission{
251                                 expire: time.Now().Add(time.Duration(c.TTL)),
252                         })
253                         if pdh != targetID {
254                                 c.pdhs.Add(targetID, &cachedPDH{
255                                         expire: time.Now().Add(time.Duration(c.UUIDTTL)),
256                                         pdh:    pdh,
257                                 })
258                         }
259                         return collection, err
260                 } else {
261                         // PDH changed, but now we know we have
262                         // permission -- and maybe we already have the
263                         // new PDH in the cache.
264                         if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
265                                 return coll, nil
266                         }
267                 }
268         }
269
270         // Collection manifest is not cached.
271         atomic.AddUint64(&c.stats.APICalls, 1)
272         c.metrics.apiCalls.Inc()
273         err := arv.Get("collections", targetID, nil, &collection)
274         if err != nil {
275                 return nil, err
276         }
277         exp := time.Now().Add(time.Duration(c.TTL))
278         c.permissions.Add(permKey, &cachedPermission{
279                 expire: exp,
280         })
281         c.pdhs.Add(targetID, &cachedPDH{
282                 expire: time.Now().Add(time.Duration(c.UUIDTTL)),
283                 pdh:    collection.PortableDataHash,
284         })
285         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
286                 expire:     exp,
287                 collection: collection,
288         })
289         if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
290                 go c.pruneCollections()
291         }
292         return collection, nil
293 }
294
295 // pruneCollections checks the total bytes occupied by manifest_text
296 // in the collection cache and removes old entries as needed to bring
297 // the total size down to CollectionBytes. It also deletes all expired
298 // entries.
299 //
300 // pruneCollections does not aim to be perfectly correct when there is
301 // concurrent cache activity.
302 func (c *cache) pruneCollections() {
303         var size int64
304         now := time.Now()
305         keys := c.collections.Keys()
306         entsize := make([]int, len(keys))
307         expired := make([]bool, len(keys))
308         for i, k := range keys {
309                 v, ok := c.collections.Peek(k)
310                 if !ok {
311                         continue
312                 }
313                 ent := v.(*cachedCollection)
314                 n := len(ent.collection.ManifestText)
315                 size += int64(n)
316                 entsize[i] = n
317                 expired[i] = ent.expire.Before(now)
318         }
319         for i, k := range keys {
320                 if expired[i] {
321                         c.collections.Remove(k)
322                         size -= int64(entsize[i])
323                 }
324         }
325         for i, k := range keys {
326                 if size <= c.MaxCollectionBytes {
327                         break
328                 }
329                 if expired[i] {
330                         // already removed this entry in the previous loop
331                         continue
332                 }
333                 c.collections.Remove(k)
334                 size -= int64(entsize[i])
335         }
336 }
337
338 // collectionBytes returns the approximate memory size of the
339 // collection cache.
340 func (c *cache) collectionBytes() uint64 {
341         var size uint64
342         for _, k := range c.collections.Keys() {
343                 v, ok := c.collections.Peek(k)
344                 if !ok {
345                         continue
346                 }
347                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
348         }
349         return size
350 }
351
352 func (c *cache) lookupCollection(key string) *arvados.Collection {
353         e, cached := c.collections.Get(key)
354         if !cached {
355                 return nil
356         }
357         ent := e.(*cachedCollection)
358         if ent.expire.Before(time.Now()) {
359                 c.collections.Remove(key)
360                 return nil
361         }
362         atomic.AddUint64(&c.stats.CollectionHits, 1)
363         c.metrics.collectionHits.Inc()
364         return ent.collection
365 }