21158: Fix tests
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepweb
6
7 import (
8         "errors"
9         "net/http"
10         "sort"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16         "git.arvados.org/arvados.git/sdk/go/keepclient"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const metricsUpdateInterval = time.Second / 10
22
23 type cache struct {
24         cluster   *arvados.Cluster
25         logger    logrus.FieldLogger
26         registry  *prometheus.Registry
27         metrics   cacheMetrics
28         sessions  map[string]*cachedSession
29         setupOnce sync.Once
30         mtx       sync.Mutex
31
32         chPruneSessions chan struct{}
33 }
34
35 type cacheMetrics struct {
36         requests        prometheus.Counter
37         collectionBytes prometheus.Gauge
38         sessionEntries  prometheus.Gauge
39         sessionHits     prometheus.Counter
40         sessionMisses   prometheus.Counter
41 }
42
43 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
44         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
45                 Namespace: "arvados",
46                 Subsystem: "keepweb_sessions",
47                 Name:      "cached_session_bytes",
48                 Help:      "Total size of all cached sessions.",
49         })
50         reg.MustRegister(m.collectionBytes)
51         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
52                 Namespace: "arvados",
53                 Subsystem: "keepweb_sessions",
54                 Name:      "active",
55                 Help:      "Number of active token sessions.",
56         })
57         reg.MustRegister(m.sessionEntries)
58         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
59                 Namespace: "arvados",
60                 Subsystem: "keepweb_sessions",
61                 Name:      "hits",
62                 Help:      "Number of token session cache hits.",
63         })
64         reg.MustRegister(m.sessionHits)
65         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
66                 Namespace: "arvados",
67                 Subsystem: "keepweb_sessions",
68                 Name:      "misses",
69                 Help:      "Number of token session cache misses.",
70         })
71         reg.MustRegister(m.sessionMisses)
72 }
73
74 type cachedSession struct {
75         cache         *cache
76         expire        time.Time
77         client        *arvados.Client
78         arvadosclient *arvadosclient.ArvadosClient
79         keepclient    *keepclient.KeepClient
80
81         // Each session uses a system of three mutexes (plus the
82         // cache-wide mutex) to enable the following semantics:
83         //
84         // - There are never multiple sessions in use for a given
85         // token.
86         //
87         // - If the cached in-memory filesystems/user records are
88         // older than the configured cache TTL when a request starts,
89         // the request will use new ones.
90         //
91         // - Unused sessions are garbage-collected.
92         //
93         // In particular, when it is necessary to reset a session's
94         // filesystem/user record (to save memory or respect the
95         // configured cache TTL), any operations that are already
96         // using the existing filesystem/user record are allowed to
97         // finish before the new filesystem is constructed.
98         //
99         // The locks must be acquired in the following order:
100         // cache.mtx, session.mtx, session.refresh, session.inuse.
101
102         // mtx is RLocked while session is not safe to evict from
103         // cache -- i.e., a checkout() has decided to use it, and its
104         // caller is not finished with it. When locking or rlocking
105         // this mtx, the cache mtx MUST already be held.
106         //
107         // This mutex enables pruneSessions to detect when it is safe
108         // to completely remove the session entry from the cache.
109         mtx sync.RWMutex
110         // refresh must be locked in order to read or write the
111         // fs/user/userLoaded/lastuse fields. This mutex enables
112         // GetSession and pruneSessions to remove/replace fs and user
113         // values safely.
114         refresh sync.Mutex
115         // inuse must be RLocked while the session is in use by a
116         // caller. This mutex enables pruneSessions() to wait for all
117         // existing usage to finish by calling inuse.Lock().
118         inuse sync.RWMutex
119
120         fs         arvados.CustomFileSystem
121         user       arvados.User
122         userLoaded bool
123         lastuse    time.Time
124 }
125
126 func (sess *cachedSession) Release() {
127         sess.inuse.RUnlock()
128         sess.mtx.RUnlock()
129         select {
130         case sess.cache.chPruneSessions <- struct{}{}:
131         default:
132         }
133 }
134
135 func (c *cache) setup() {
136         var err error
137         c.sessions = map[string]*cachedSession{}
138         if err != nil {
139                 panic(err)
140         }
141
142         reg := c.registry
143         if reg == nil {
144                 reg = prometheus.NewRegistry()
145         }
146         c.metrics.setup(reg)
147         go func() {
148                 for range time.Tick(metricsUpdateInterval) {
149                         c.updateGauges()
150                 }
151         }()
152         c.chPruneSessions = make(chan struct{}, 1)
153         go func() {
154                 for range c.chPruneSessions {
155                         c.pruneSessions()
156                 }
157         }()
158 }
159
160 func (c *cache) updateGauges() {
161         n, size := c.sessionsSize()
162         c.metrics.collectionBytes.Set(float64(size))
163         c.metrics.sessionEntries.Set(float64(n))
164 }
165
166 var selectPDH = map[string]interface{}{
167         "select": []string{"portable_data_hash"},
168 }
169
170 func (c *cache) checkout(token string) (*cachedSession, error) {
171         c.setupOnce.Do(c.setup)
172         c.mtx.Lock()
173         defer c.mtx.Unlock()
174         sess := c.sessions[token]
175         if sess == nil {
176                 client, err := arvados.NewClientFromConfig(c.cluster)
177                 if err != nil {
178                         return nil, err
179                 }
180                 client.AuthToken = token
181                 client.Timeout = time.Minute
182                 // A non-empty origin header tells controller to
183                 // prioritize our traffic as interactive, which is
184                 // true most of the time.
185                 origin := c.cluster.Services.WebDAVDownload.ExternalURL
186                 client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
187                 arvadosclient, err := arvadosclient.New(client)
188                 if err != nil {
189                         return nil, err
190                 }
191                 sess = &cachedSession{
192                         cache:         c,
193                         client:        client,
194                         arvadosclient: arvadosclient,
195                         keepclient:    keepclient.New(arvadosclient),
196                 }
197                 c.sessions[token] = sess
198         }
199         sess.mtx.RLock()
200         return sess, nil
201 }
202
203 // Get a long-lived CustomFileSystem suitable for doing a read or
204 // write operation with the given token.
205 //
206 // If the returned error is nil, the caller must call Release() on the
207 // returned session when finished using it.
208 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
209         sess, err := c.checkout(token)
210         if err != nil {
211                 return nil, nil, nil, err
212         }
213         sess.refresh.Lock()
214         defer sess.refresh.Unlock()
215         now := time.Now()
216         sess.lastuse = now
217         refresh := sess.expire.Before(now)
218         if sess.fs == nil || !sess.userLoaded || refresh {
219                 // Wait for all active users to finish (otherwise they
220                 // might make changes to an old fs after we start
221                 // using the new fs).
222                 sess.inuse.Lock()
223                 if !sess.userLoaded || refresh {
224                         err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil)
225                         if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
226                                 // token is OK, but "get user id" api is out
227                                 // of scope -- use existing/expired info if
228                                 // any, or leave empty for unknown user
229                         } else if err != nil {
230                                 sess.inuse.Unlock()
231                                 sess.mtx.RUnlock()
232                                 return nil, nil, nil, err
233                         }
234                         sess.userLoaded = true
235                 }
236
237                 if sess.fs == nil || refresh {
238                         sess.fs = sess.client.SiteFileSystem(sess.keepclient)
239                         sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
240                         sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
241                         c.metrics.sessionMisses.Inc()
242                 } else {
243                         c.metrics.sessionHits.Inc()
244                 }
245                 sess.inuse.Unlock()
246         } else {
247                 c.metrics.sessionHits.Inc()
248         }
249         sess.inuse.RLock()
250         return sess.fs, sess, &sess.user, nil
251 }
252
253 type sessionSnapshot struct {
254         token   string
255         sess    *cachedSession
256         lastuse time.Time
257         fs      arvados.CustomFileSystem
258         size    int64
259         prune   bool
260 }
261
262 // Remove all expired idle session cache entries, and remove in-memory
263 // filesystems until approximate remaining size <= maxsize
264 func (c *cache) pruneSessions() {
265         now := time.Now()
266         c.mtx.Lock()
267         snaps := make([]sessionSnapshot, 0, len(c.sessions))
268         for token, sess := range c.sessions {
269                 snaps = append(snaps, sessionSnapshot{
270                         token: token,
271                         sess:  sess,
272                 })
273         }
274         c.mtx.Unlock()
275
276         // Load lastuse/fs/expire data from sessions. Note we do this
277         // after unlocking c.mtx because sess.refresh.Lock sometimes
278         // waits for another goroutine to finish "[re]fetch user
279         // record".
280         for i := range snaps {
281                 snaps[i].sess.refresh.Lock()
282                 snaps[i].lastuse = snaps[i].sess.lastuse
283                 snaps[i].fs = snaps[i].sess.fs
284                 snaps[i].prune = snaps[i].sess.expire.Before(now)
285                 snaps[i].sess.refresh.Unlock()
286         }
287
288         // Sort sessions with oldest first.
289         sort.Slice(snaps, func(i, j int) bool {
290                 return snaps[i].lastuse.Before(snaps[j].lastuse)
291         })
292
293         // Add up size of sessions that aren't already marked for
294         // pruning based on expire time.
295         var size int64
296         for i, snap := range snaps {
297                 if !snap.prune && snap.fs != nil {
298                         size := snap.fs.MemorySize()
299                         snaps[i].size = size
300                         size += size
301                 }
302         }
303         // Mark more sessions for deletion until reaching desired
304         // memory size limit, starting with the oldest entries.
305         for i, snap := range snaps {
306                 if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) {
307                         break
308                 }
309                 if snap.prune {
310                         continue
311                 }
312                 snaps[i].prune = true
313                 size -= snap.size
314         }
315
316         // Mark more sessions for deletion until reaching desired
317         // session count limit.
318         mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
319         for i := range snaps {
320                 if snaps[i].prune {
321                         mustprune--
322                 }
323         }
324         for i := range snaps {
325                 if mustprune < 1 {
326                         break
327                 } else if !snaps[i].prune {
328                         snaps[i].prune = true
329                         mustprune--
330                 }
331         }
332
333         c.mtx.Lock()
334         defer c.mtx.Unlock()
335         for _, snap := range snaps {
336                 if !snap.prune {
337                         continue
338                 }
339                 sess := snap.sess
340                 if sess.mtx.TryLock() {
341                         delete(c.sessions, snap.token)
342                         continue
343                 }
344                 // We can't remove a session that's been checked out
345                 // -- that would allow another session to be created
346                 // for the same token using a different in-memory
347                 // filesystem. Instead, we wait for active requests to
348                 // finish and then "unload" it. After this, either the
349                 // next GetSession will reload fs/user, or a
350                 // subsequent pruneSessions will remove the session.
351                 go func() {
352                         // Ensure nobody is mid-GetSession() (note we
353                         // already know nobody is mid-checkout()
354                         // because we have c.mtx locked)
355                         sess.refresh.Lock()
356                         defer sess.refresh.Unlock()
357                         // Wait for current usage to finish (i.e.,
358                         // anyone who has decided to use the current
359                         // values of sess.fs and sess.user, and hasn't
360                         // called Release() yet)
361                         sess.inuse.Lock()
362                         defer sess.inuse.Unlock()
363                         // Release memory
364                         sess.fs = nil
365                         // Next GetSession will make a new fs
366                 }()
367         }
368 }
369
370 // sessionsSize returns the number and approximate total memory size
371 // of all cached sessions.
372 func (c *cache) sessionsSize() (n int, size int64) {
373         c.mtx.Lock()
374         n = len(c.sessions)
375         sessions := make([]*cachedSession, 0, n)
376         for _, sess := range c.sessions {
377                 sessions = append(sessions, sess)
378         }
379         c.mtx.Unlock()
380         for _, sess := range sessions {
381                 sess.refresh.Lock()
382                 fs := sess.fs
383                 sess.refresh.Unlock()
384                 if fs != nil {
385                         size += fs.MemorySize()
386                 }
387         }
388         return
389 }