Merge branch '16850-s3-keycount'
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "time"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
13         "github.com/hashicorp/golang-lru"
14         "github.com/prometheus/client_golang/prometheus"
15 )
16
17 const metricsUpdateInterval = time.Second / 10
18
19 type cache struct {
20         config      *arvados.WebDAVCacheConfig
21         registry    *prometheus.Registry
22         metrics     cacheMetrics
23         pdhs        *lru.TwoQueueCache
24         collections *lru.TwoQueueCache
25         permissions *lru.TwoQueueCache
26         setupOnce   sync.Once
27 }
28
29 type cacheMetrics struct {
30         requests          prometheus.Counter
31         collectionBytes   prometheus.Gauge
32         collectionEntries prometheus.Gauge
33         collectionHits    prometheus.Counter
34         pdhHits           prometheus.Counter
35         permissionHits    prometheus.Counter
36         apiCalls          prometheus.Counter
37 }
38
39 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
40         m.requests = prometheus.NewCounter(prometheus.CounterOpts{
41                 Namespace: "arvados",
42                 Subsystem: "keepweb_collectioncache",
43                 Name:      "requests",
44                 Help:      "Number of targetID-to-manifest lookups handled.",
45         })
46         reg.MustRegister(m.requests)
47         m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
48                 Namespace: "arvados",
49                 Subsystem: "keepweb_collectioncache",
50                 Name:      "hits",
51                 Help:      "Number of pdh-to-manifest cache hits.",
52         })
53         reg.MustRegister(m.collectionHits)
54         m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
55                 Namespace: "arvados",
56                 Subsystem: "keepweb_collectioncache",
57                 Name:      "pdh_hits",
58                 Help:      "Number of uuid-to-pdh cache hits.",
59         })
60         reg.MustRegister(m.pdhHits)
61         m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
62                 Namespace: "arvados",
63                 Subsystem: "keepweb_collectioncache",
64                 Name:      "permission_hits",
65                 Help:      "Number of targetID-to-permission cache hits.",
66         })
67         reg.MustRegister(m.permissionHits)
68         m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
69                 Namespace: "arvados",
70                 Subsystem: "keepweb_collectioncache",
71                 Name:      "api_calls",
72                 Help:      "Number of outgoing API calls made by cache.",
73         })
74         reg.MustRegister(m.apiCalls)
75         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
76                 Namespace: "arvados",
77                 Subsystem: "keepweb_collectioncache",
78                 Name:      "cached_manifest_bytes",
79                 Help:      "Total size of all manifests in cache.",
80         })
81         reg.MustRegister(m.collectionBytes)
82         m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
83                 Namespace: "arvados",
84                 Subsystem: "keepweb_collectioncache",
85                 Name:      "cached_manifests",
86                 Help:      "Number of manifests in cache.",
87         })
88         reg.MustRegister(m.collectionEntries)
89 }
90
91 type cachedPDH struct {
92         expire time.Time
93         pdh    string
94 }
95
96 type cachedCollection struct {
97         expire     time.Time
98         collection *arvados.Collection
99 }
100
101 type cachedPermission struct {
102         expire time.Time
103 }
104
105 func (c *cache) setup() {
106         var err error
107         c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
108         if err != nil {
109                 panic(err)
110         }
111         c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
112         if err != nil {
113                 panic(err)
114         }
115         c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
116         if err != nil {
117                 panic(err)
118         }
119
120         reg := c.registry
121         if reg == nil {
122                 reg = prometheus.NewRegistry()
123         }
124         c.metrics.setup(reg)
125         go func() {
126                 for range time.Tick(metricsUpdateInterval) {
127                         c.updateGauges()
128                 }
129         }()
130 }
131
132 func (c *cache) updateGauges() {
133         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
134         c.metrics.collectionEntries.Set(float64(c.collections.Len()))
135 }
136
137 var selectPDH = map[string]interface{}{
138         "select": []string{"portable_data_hash"},
139 }
140
141 // Update saves a modified version (fs) to an existing collection
142 // (coll) and, if successful, updates the relevant cache entries so
143 // subsequent calls to Get() reflect the modifications.
144 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
145         c.setupOnce.Do(c.setup)
146
147         if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
148                 return err
149         } else {
150                 coll.ManifestText = m
151         }
152         var updated arvados.Collection
153         defer c.pdhs.Remove(coll.UUID)
154         err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
155                 "collection": map[string]string{
156                         "manifest_text": coll.ManifestText,
157                 },
158         })
159         if err == nil {
160                 c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
161                         expire:     time.Now().Add(time.Duration(c.config.TTL)),
162                         collection: &updated,
163                 })
164         }
165         return err
166 }
167
168 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
169         c.setupOnce.Do(c.setup)
170         c.metrics.requests.Inc()
171
172         permOK := false
173         permKey := arv.ApiToken + "\000" + targetID
174         if forceReload {
175         } else if ent, cached := c.permissions.Get(permKey); cached {
176                 ent := ent.(*cachedPermission)
177                 if ent.expire.Before(time.Now()) {
178                         c.permissions.Remove(permKey)
179                 } else {
180                         permOK = true
181                         c.metrics.permissionHits.Inc()
182                 }
183         }
184
185         var pdh string
186         if arvadosclient.PDHMatch(targetID) {
187                 pdh = targetID
188         } else if ent, cached := c.pdhs.Get(targetID); cached {
189                 ent := ent.(*cachedPDH)
190                 if ent.expire.Before(time.Now()) {
191                         c.pdhs.Remove(targetID)
192                 } else {
193                         pdh = ent.pdh
194                         c.metrics.pdhHits.Inc()
195                 }
196         }
197
198         var collection *arvados.Collection
199         if pdh != "" {
200                 collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
201         }
202
203         if collection != nil && permOK {
204                 return collection, nil
205         } else if collection != nil {
206                 // Ask API for current PDH for this targetID. Most
207                 // likely, the cached PDH is still correct; if so,
208                 // _and_ the current token has permission, we can
209                 // use our cached manifest.
210                 c.metrics.apiCalls.Inc()
211                 var current arvados.Collection
212                 err := arv.Get("collections", targetID, selectPDH, &current)
213                 if err != nil {
214                         return nil, err
215                 }
216                 if current.PortableDataHash == pdh {
217                         c.permissions.Add(permKey, &cachedPermission{
218                                 expire: time.Now().Add(time.Duration(c.config.TTL)),
219                         })
220                         if pdh != targetID {
221                                 c.pdhs.Add(targetID, &cachedPDH{
222                                         expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
223                                         pdh:    pdh,
224                                 })
225                         }
226                         return collection, err
227                 } else {
228                         // PDH changed, but now we know we have
229                         // permission -- and maybe we already have the
230                         // new PDH in the cache.
231                         if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
232                                 return coll, nil
233                         }
234                 }
235         }
236
237         // Collection manifest is not cached.
238         c.metrics.apiCalls.Inc()
239         err := arv.Get("collections", targetID, nil, &collection)
240         if err != nil {
241                 return nil, err
242         }
243         exp := time.Now().Add(time.Duration(c.config.TTL))
244         c.permissions.Add(permKey, &cachedPermission{
245                 expire: exp,
246         })
247         c.pdhs.Add(targetID, &cachedPDH{
248                 expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
249                 pdh:    collection.PortableDataHash,
250         })
251         c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
252                 expire:     exp,
253                 collection: collection,
254         })
255         if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
256                 go c.pruneCollections()
257         }
258         return collection, nil
259 }
260
261 // pruneCollections checks the total bytes occupied by manifest_text
262 // in the collection cache and removes old entries as needed to bring
263 // the total size down to CollectionBytes. It also deletes all expired
264 // entries.
265 //
266 // pruneCollections does not aim to be perfectly correct when there is
267 // concurrent cache activity.
268 func (c *cache) pruneCollections() {
269         var size int64
270         now := time.Now()
271         keys := c.collections.Keys()
272         entsize := make([]int, len(keys))
273         expired := make([]bool, len(keys))
274         for i, k := range keys {
275                 v, ok := c.collections.Peek(k)
276                 if !ok {
277                         continue
278                 }
279                 ent := v.(*cachedCollection)
280                 n := len(ent.collection.ManifestText)
281                 size += int64(n)
282                 entsize[i] = n
283                 expired[i] = ent.expire.Before(now)
284         }
285         for i, k := range keys {
286                 if expired[i] {
287                         c.collections.Remove(k)
288                         size -= int64(entsize[i])
289                 }
290         }
291         for i, k := range keys {
292                 if size <= c.config.MaxCollectionBytes {
293                         break
294                 }
295                 if expired[i] {
296                         // already removed this entry in the previous loop
297                         continue
298                 }
299                 c.collections.Remove(k)
300                 size -= int64(entsize[i])
301         }
302 }
303
304 // collectionBytes returns the approximate memory size of the
305 // collection cache.
306 func (c *cache) collectionBytes() uint64 {
307         var size uint64
308         for _, k := range c.collections.Keys() {
309                 v, ok := c.collections.Peek(k)
310                 if !ok {
311                         continue
312                 }
313                 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
314         }
315         return size
316 }
317
318 func (c *cache) lookupCollection(key string) *arvados.Collection {
319         e, cached := c.collections.Get(key)
320         if !cached {
321                 return nil
322         }
323         ent := e.(*cachedCollection)
324         if ent.expire.Before(time.Now()) {
325                 c.collections.Remove(key)
326                 return nil
327         }
328         c.metrics.collectionHits.Inc()
329         return ent.collection
330 }