1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16 "git.arvados.org/arvados.git/sdk/go/keepclient"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
21 const metricsUpdateInterval = time.Second / 10
24 cluster *arvados.Cluster
25 logger logrus.FieldLogger
26 registry *prometheus.Registry
28 sessions map[string]*cachedSession
32 chPruneSessions chan struct{}
35 type cacheMetrics struct {
36 requests prometheus.Counter
37 collectionBytes prometheus.Gauge
38 sessionEntries prometheus.Gauge
39 sessionHits prometheus.Counter
40 sessionMisses prometheus.Counter
43 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
44 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
46 Subsystem: "keepweb_sessions",
47 Name: "cached_session_bytes",
48 Help: "Total size of all cached sessions.",
50 reg.MustRegister(m.collectionBytes)
51 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
53 Subsystem: "keepweb_sessions",
55 Help: "Number of active token sessions.",
57 reg.MustRegister(m.sessionEntries)
58 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
60 Subsystem: "keepweb_sessions",
62 Help: "Number of token session cache hits.",
64 reg.MustRegister(m.sessionHits)
65 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
67 Subsystem: "keepweb_sessions",
69 Help: "Number of token session cache misses.",
71 reg.MustRegister(m.sessionMisses)
74 type cachedSession struct {
77 client *arvados.Client
78 arvadosclient *arvadosclient.ArvadosClient
79 keepclient *keepclient.KeepClient
81 // Each session uses a system of three mutexes (plus the
82 // cache-wide mutex) to enable the following semantics:
84 // - There are never multiple sessions in use for a given
87 // - If the cached in-memory filesystems/user records are
88 // older than the configured cache TTL when a request starts,
89 // the request will use new ones.
91 // - Unused sessions are garbage-collected.
93 // In particular, when it is necessary to reset a session's
94 // filesystem/user record (to save memory or respect the
95 // configured cache TTL), any operations that are already
96 // using the existing filesystem/user record are allowed to
97 // finish before the new filesystem is constructed.
99 // The locks must be acquired in the following order:
100 // cache.mtx, session.mtx, session.refresh, session.inuse.
102 // mtx is RLocked while session is not safe to evict from
103 // cache -- i.e., a checkout() has decided to use it, and its
104 // caller is not finished with it. When locking or rlocking
105 // this mtx, the cache mtx MUST already be held.
107 // This mutex enables pruneSessions to detect when it is safe
108 // to completely remove the session entry from the cache.
110 // refresh must be locked in order to read or write the
111 // fs/user/userLoaded/lastuse fields. This mutex enables
112 // GetSession and pruneSessions to remove/replace fs and user
115 // inuse must be RLocked while the session is in use by a
116 // caller. This mutex enables pruneSessions() to wait for all
117 // existing usage to finish by calling inuse.Lock().
120 fs arvados.CustomFileSystem
126 func (sess *cachedSession) Release() {
130 case sess.cache.chPruneSessions <- struct{}{}:
135 func (c *cache) setup() {
137 c.sessions = map[string]*cachedSession{}
144 reg = prometheus.NewRegistry()
148 for range time.Tick(metricsUpdateInterval) {
152 c.chPruneSessions = make(chan struct{}, 1)
154 for range c.chPruneSessions {
160 func (c *cache) updateGauges() {
161 n, size := c.sessionsSize()
162 c.metrics.collectionBytes.Set(float64(size))
163 c.metrics.sessionEntries.Set(float64(n))
166 var selectPDH = map[string]interface{}{
167 "select": []string{"portable_data_hash"},
170 func (c *cache) checkout(token string) (*cachedSession, error) {
171 c.setupOnce.Do(c.setup)
174 sess := c.sessions[token]
176 client, err := arvados.NewClientFromConfig(c.cluster)
180 client.AuthToken = token
181 client.Timeout = time.Minute
182 client.Logger = c.logger
183 // A non-empty origin header tells controller to
184 // prioritize our traffic as interactive, which is
185 // true most of the time.
186 origin := c.cluster.Services.WebDAVDownload.ExternalURL
187 client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
188 arvadosclient, err := arvadosclient.New(client)
192 kc := keepclient.New(arvadosclient)
193 kc.DiskCacheSize = c.cluster.Collections.WebDAVCache.DiskCacheSize
194 sess = &cachedSession{
197 arvadosclient: arvadosclient,
200 c.sessions[token] = sess
206 // Get a long-lived CustomFileSystem suitable for doing a read or
207 // write operation with the given token.
209 // If the returned error is nil, the caller must call Release() on the
210 // returned session when finished using it.
211 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
212 sess, err := c.checkout(token)
214 return nil, nil, nil, err
217 defer sess.refresh.Unlock()
220 refresh := sess.expire.Before(now)
221 if sess.fs == nil || !sess.userLoaded || refresh {
222 // Wait for all active users to finish (otherwise they
223 // might make changes to an old fs after we start
224 // using the new fs).
226 if !sess.userLoaded || refresh {
227 err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil)
228 if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
229 // token is OK, but "get user id" api is out
230 // of scope -- use existing/expired info if
231 // any, or leave empty for unknown user
232 } else if err != nil {
235 return nil, nil, nil, err
237 sess.userLoaded = true
240 if sess.fs == nil || refresh {
241 sess.fs = sess.client.SiteFileSystem(sess.keepclient)
242 sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
243 sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
244 c.metrics.sessionMisses.Inc()
246 c.metrics.sessionHits.Inc()
250 c.metrics.sessionHits.Inc()
253 return sess.fs, sess, &sess.user, nil
256 type sessionSnapshot struct {
260 fs arvados.CustomFileSystem
265 // Remove all expired idle session cache entries, and remove in-memory
266 // filesystems until approximate remaining size <= maxsize
267 func (c *cache) pruneSessions() {
270 snaps := make([]sessionSnapshot, 0, len(c.sessions))
271 for token, sess := range c.sessions {
272 snaps = append(snaps, sessionSnapshot{
279 // Load lastuse/fs/expire data from sessions. Note we do this
280 // after unlocking c.mtx because sess.refresh.Lock sometimes
281 // waits for another goroutine to finish "[re]fetch user
283 for i := range snaps {
284 snaps[i].sess.refresh.Lock()
285 snaps[i].lastuse = snaps[i].sess.lastuse
286 snaps[i].fs = snaps[i].sess.fs
287 snaps[i].prune = snaps[i].sess.expire.Before(now)
288 snaps[i].sess.refresh.Unlock()
291 // Sort sessions with oldest first.
292 sort.Slice(snaps, func(i, j int) bool {
293 return snaps[i].lastuse.Before(snaps[j].lastuse)
296 // Add up size of sessions that aren't already marked for
297 // pruning based on expire time.
299 for i, snap := range snaps {
300 if !snap.prune && snap.fs != nil {
301 size := snap.fs.MemorySize()
306 // Mark more sessions for deletion until reaching desired
307 // memory size limit, starting with the oldest entries.
308 for i, snap := range snaps {
309 if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) {
315 snaps[i].prune = true
319 // Mark more sessions for deletion until reaching desired
320 // session count limit.
321 mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
322 for i := range snaps {
327 for i := range snaps {
330 } else if !snaps[i].prune {
331 snaps[i].prune = true
338 for _, snap := range snaps {
343 if sess.mtx.TryLock() {
344 delete(c.sessions, snap.token)
347 // We can't remove a session that's been checked out
348 // -- that would allow another session to be created
349 // for the same token using a different in-memory
350 // filesystem. Instead, we wait for active requests to
351 // finish and then "unload" it. After this, either the
352 // next GetSession will reload fs/user, or a
353 // subsequent pruneSessions will remove the session.
355 // Ensure nobody is mid-GetSession() (note we
356 // already know nobody is mid-checkout()
357 // because we have c.mtx locked)
359 defer sess.refresh.Unlock()
360 // Wait for current usage to finish (i.e.,
361 // anyone who has decided to use the current
362 // values of sess.fs and sess.user, and hasn't
363 // called Release() yet)
365 defer sess.inuse.Unlock()
368 // Next GetSession will make a new fs
373 // sessionsSize returns the number and approximate total memory size
374 // of all cached sessions.
375 func (c *cache) sessionsSize() (n int, size int64) {
378 sessions := make([]*cachedSession, 0, n)
379 for _, sess := range c.sessions {
380 sessions = append(sessions, sess)
383 for _, sess := range sessions {
386 sess.refresh.Unlock()
388 size += fs.MemorySize()