14873: Upgrades 'pg' gem & solves an issue related to a new internal table.
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
13         "github.com/hashicorp/golang-lru"
14         "github.com/prometheus/client_golang/prometheus"
15 )
16
17 const metricsUpdateInterval = time.Second / 10
18
19 type cache struct {
20         TTL                  arvados.Duration
21         UUIDTTL              arvados.Duration
22         MaxCollectionEntries int
23         MaxCollectionBytes   int64
24         MaxPermissionEntries int
25         MaxUUIDEntries       int
26
27         registry    *prometheus.Registry
28         metrics     cacheMetrics
29         pdhs        *lru.TwoQueueCache
30         collections *lru.TwoQueueCache
31         permissions *lru.TwoQueueCache
32         setupOnce   sync.Once
33 }
34
35 type cacheMetrics struct {
36         requests          prometheus.Counter
37         collectionBytes   prometheus.Gauge
38         collectionEntries prometheus.Gauge
39         collectionHits    prometheus.Counter
40         pdhHits           prometheus.Counter
41         permissionHits    prometheus.Counter
42         apiCalls          prometheus.Counter
43 }
44
45 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
46         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
47                 Namespace: "arvados",
48                 Subsystem: "keepweb_collectioncache",
49                 Name:      "requests",
50                 Help:      "Number of targetID-to-manifest lookups handled.",
51         })
52         reg.MustRegister(m.requests)
53         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
54                 Namespace: "arvados",
55                 Subsystem: "keepweb_collectioncache",
56                 Name:      "hits",
57                 Help:      "Number of pdh-to-manifest cache hits.",
58         })
59         reg.MustRegister(m.collectionHits)
60         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
61                 Namespace: "arvados",
62                 Subsystem: "keepweb_collectioncache",
63                 Name:      "pdh_hits",
64                 Help:      "Number of uuid-to-pdh cache hits.",
65         })
66         reg.MustRegister(m.pdhHits)
67         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
68                 Namespace: "arvados",
69                 Subsystem: "keepweb_collectioncache",
70                 Name:      "permission_hits",
71                 Help:      "Number of targetID-to-permission cache hits.",
72         })
73         reg.MustRegister(m.permissionHits)
74         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
75                 Namespace: "arvados",
76                 Subsystem: "keepweb_collectioncache",
77                 Name:      "api_calls",
78                 Help:      "Number of outgoing API calls made by cache.",
79         })
80         reg.MustRegister(m.apiCalls)
81         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
82                 Namespace: "arvados",
83                 Subsystem: "keepweb_collectioncache",
84                 Name:      "cached_manifest_bytes",
85                 Help:      "Total size of all manifests in cache.",
86         })
87         reg.MustRegister(m.collectionBytes)
88         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
89                 Namespace: "arvados",
90                 Subsystem: "keepweb_collectioncache",
91                 Name:      "cached_manifests",
92                 Help:      "Number of manifests in cache.",
93         })
94         reg.MustRegister(m.collectionEntries)
95 }
96
97 type cachedPDH struct {
98         expire time.Time
99         pdh    string
100 }
101
102 type cachedCollection struct {
103         expire     time.Time
104         collection *arvados.Collection
105 }
106
107 type cachedPermission struct {
108         expire time.Time
109 }
110
111 func (c *cache) setup() {
112         var err error
113         c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
114         if err != nil {
115                 panic(err)
116         }
117         c.collections, err = lru.New2Q(c.MaxCollectionEntries)
118         if err != nil {
119                 panic(err)
120         }
121         c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
122         if err != nil {
123                 panic(err)
124         }
125
126         reg := c.registry
127         if reg == nil {
128                 reg = prometheus.NewRegistry()
129         }
130         c.metrics.setup(reg)
131         go func() {
132                 for range time.Tick(metricsUpdateInterval) {
133                         c.updateGauges()
134                 }
135         }()
136 }
137
138 func (c *cache) updateGauges() {
139         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
140         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
141 }
142
143 var selectPDH = map[string]interface{}{
144         "select": []string{"portable_data_hash"},
145 }
146
147 // Update saves a modified version (fs) to an existing collection
148 // (coll) and, if successful, updates the relevant cache entries so
149 // subsequent calls to Get() reflect the modifications.
150 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
151         c.setupOnce.Do(c.setup)
152
153         if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
154                 return err
155         } else {
156                 coll.ManifestText = m
157         }
158         var updated arvados.Collection
159         defer c.pdhs.Remove(coll.UUID)
160         err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
161         if err == nil {
162                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
163                         expire:     time.Now().Add(time.Duration(c.TTL)),
164                         collection: &updated,
165                 })
166         }
167         return err
168 }
169
170 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
171         c.setupOnce.Do(c.setup)
172         c.metrics.requests.Inc()
173
174         permOK := false
175         permKey := arv.ApiToken + "\000" + targetID
176         if forceReload {
177         } else if ent, cached := c.permissions.Get(permKey); cached {
178                 ent := ent.(*cachedPermission)
179                 if ent.expire.Before(time.Now()) {
180                         c.permissions.Remove(permKey)
181                 } else {
182                         permOK = true
183                         c.metrics.permissionHits.Inc()
184                 }
185         }
186
187         var pdh string
188         if arvadosclient.PDHMatch(targetID) {
189                 pdh = targetID
190         } else if ent, cached := c.pdhs.Get(targetID); cached {
191                 ent := ent.(*cachedPDH)
192                 if ent.expire.Before(time.Now()) {
193                         c.pdhs.Remove(targetID)
194                 } else {
195                         pdh = ent.pdh
196                         c.metrics.pdhHits.Inc()
197                 }
198         }
199
200         var collection *arvados.Collection
201         if pdh != "" {
202                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
203         }
204
205         if collection != nil && permOK {
206                 return collection, nil
207         } else if collection != nil {
208                 // Ask API for current PDH for this targetID. Most
209                 // likely, the cached PDH is still correct; if so,
210                 // _and_ the current token has permission, we can
211                 // use our cached manifest.
212                 c.metrics.apiCalls.Inc()
213                 var current arvados.Collection
214                 err := arv.Get("collections", targetID, selectPDH, &current)
215                 if err != nil {
216                         return nil, err
217                 }
218                 if current.PortableDataHash == pdh {
219                         c.permissions.Add(permKey, &cachedPermission{
220                                 expire: time.Now().Add(time.Duration(c.TTL)),
221                         })
222                         if pdh != targetID {
223                                 c.pdhs.Add(targetID, &cachedPDH{
224                                         expire: time.Now().Add(time.Duration(c.UUIDTTL)),
225                                         pdh:    pdh,
226                                 })
227                         }
228                         return collection, err
229                 } else {
230                         // PDH changed, but now we know we have
231                         // permission -- and maybe we already have the
232                         // new PDH in the cache.
233                         if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
234                                 return coll, nil
235                         }
236                 }
237         }
238
239         // Collection manifest is not cached.
240         c.metrics.apiCalls.Inc()
241         err := arv.Get("collections", targetID, nil, &collection)
242         if err != nil {
243                 return nil, err
244         }
245         exp := time.Now().Add(time.Duration(c.TTL))
246         c.permissions.Add(permKey, &cachedPermission{
247                 expire: exp,
248         })
249         c.pdhs.Add(targetID, &cachedPDH{
250                 expire: time.Now().Add(time.Duration(c.UUIDTTL)),
251                 pdh:    collection.PortableDataHash,
252         })
253         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
254                 expire:     exp,
255                 collection: collection,
256         })
257         if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
258                 go c.pruneCollections()
259         }
260         return collection, nil
261 }
262
263 // pruneCollections checks the total bytes occupied by manifest_text
264 // in the collection cache and removes old entries as needed to bring
265 // the total size down to CollectionBytes. It also deletes all expired
266 // entries.
267 //
268 // pruneCollections does not aim to be perfectly correct when there is
269 // concurrent cache activity.
270 func (c *cache) pruneCollections() {
271         var size int64
272         now := time.Now()
273         keys := c.collections.Keys()
274         entsize := make([]int, len(keys))
275         expired := make([]bool, len(keys))
276         for i, k := range keys {
277                 v, ok := c.collections.Peek(k)
278                 if !ok {
279                         continue
280                 }
281                 ent := v.(*cachedCollection)
282                 n := len(ent.collection.ManifestText)
283                 size += int64(n)
284                 entsize[i] = n
285                 expired[i] = ent.expire.Before(now)
286         }
287         for i, k := range keys {
288                 if expired[i] {
289                         c.collections.Remove(k)
290                         size -= int64(entsize[i])
291                 }
292         }
293         for i, k := range keys {
294                 if size <= c.MaxCollectionBytes {
295                         break
296                 }
297                 if expired[i] {
298                         // already removed this entry in the previous loop
299                         continue
300                 }
301                 c.collections.Remove(k)
302                 size -= int64(entsize[i])
303         }
304 }
305
306 // collectionBytes returns the approximate memory size of the
307 // collection cache.
308 func (c *cache) collectionBytes() uint64 {
309         var size uint64
310         for _, k := range c.collections.Keys() {
311                 v, ok := c.collections.Peek(k)
312                 if !ok {
313                         continue
314                 }
315                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
316         }
317         return size
318 }
319
320 func (c *cache) lookupCollection(key string) *arvados.Collection {
321         e, cached := c.collections.Get(key)
322         if !cached {
323                 return nil
324         }
325         ent := e.(*cachedCollection)
326         if ent.expire.Before(time.Now()) {
327                 c.collections.Remove(key)
328                 return nil
329         }
330         c.metrics.collectionHits.Inc()
331         return ent.collection
332 }