1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16 "git.arvados.org/arvados.git/sdk/go/keepclient"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
21 const metricsUpdateInterval = time.Second / 10
24 cluster *arvados.Cluster
25 logger logrus.FieldLogger
26 registry *prometheus.Registry
28 sessions map[string]*cachedSession
32 chPruneSessions chan struct{}
35 type cacheMetrics struct {
36 requests prometheus.Counter
37 collectionBytes prometheus.Gauge
38 sessionEntries prometheus.Gauge
39 sessionHits prometheus.Counter
40 sessionMisses prometheus.Counter
43 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
44 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
46 Subsystem: "keepweb_sessions",
47 Name: "cached_session_bytes",
48 Help: "Total size of all cached sessions.",
50 reg.MustRegister(m.collectionBytes)
51 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
53 Subsystem: "keepweb_sessions",
55 Help: "Number of active token sessions.",
57 reg.MustRegister(m.sessionEntries)
58 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
60 Subsystem: "keepweb_sessions",
62 Help: "Number of token session cache hits.",
64 reg.MustRegister(m.sessionHits)
65 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
67 Subsystem: "keepweb_sessions",
69 Help: "Number of token session cache misses.",
71 reg.MustRegister(m.sessionMisses)
74 type cachedSession struct {
77 client *arvados.Client
78 arvadosclient *arvadosclient.ArvadosClient
79 keepclient *keepclient.KeepClient
81 // Each session uses a system of three mutexes (plus the
82 // cache-wide mutex) to enable the following semantics:
84 // - There are never multiple sessions in use for a given
87 // - If the cached in-memory filesystems/user records are
88 // older than the configured cache TTL when a request starts,
89 // the request will use new ones.
91 // - Unused sessions are garbage-collected.
93 // In particular, when it is necessary to reset a session's
94 // filesystem/user record (to save memory or respect the
95 // configured cache TTL), any operations that are already
96 // using the existing filesystem/user record are allowed to
97 // finish before the new filesystem is constructed.
99 // The locks must be acquired in the following order:
100 // cache.mtx, session.mtx, session.refresh, session.inuse.
102 // mtx is RLocked while session is not safe to evict from
103 // cache -- i.e., a checkout() has decided to use it, and its
104 // caller is not finished with it. When locking or rlocking
105 // this mtx, the cache mtx MUST already be held.
107 // This mutex enables pruneSessions to detect when it is safe
108 // to completely remove the session entry from the cache.
110 // refresh must be locked in order to read or write the
111 // fs/user/userLoaded/lastuse fields. This mutex enables
112 // GetSession and pruneSessions to remove/replace fs and user
115 // inuse must be RLocked while the session is in use by a
116 // caller. This mutex enables pruneSessions() to wait for all
117 // existing usage to finish by calling inuse.Lock().
120 fs arvados.CustomFileSystem
126 func (sess *cachedSession) Release() {
130 case sess.cache.chPruneSessions <- struct{}{}:
135 func (c *cache) setup() {
137 c.sessions = map[string]*cachedSession{}
144 reg = prometheus.NewRegistry()
148 for range time.Tick(metricsUpdateInterval) {
152 c.chPruneSessions = make(chan struct{}, 1)
154 for range c.chPruneSessions {
160 func (c *cache) updateGauges() {
161 n, size := c.sessionsSize()
162 c.metrics.collectionBytes.Set(float64(size))
163 c.metrics.sessionEntries.Set(float64(n))
166 var selectPDH = map[string]interface{}{
167 "select": []string{"portable_data_hash"},
170 func (c *cache) checkout(token string) (*cachedSession, error) {
171 c.setupOnce.Do(c.setup)
174 sess := c.sessions[token]
176 client, err := arvados.NewClientFromConfig(c.cluster)
180 client.AuthToken = token
181 client.Timeout = time.Minute
182 // A non-empty origin header tells controller to
183 // prioritize our traffic as interactive, which is
184 // true most of the time.
185 origin := c.cluster.Services.WebDAVDownload.ExternalURL
186 client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
187 arvadosclient, err := arvadosclient.New(client)
191 sess = &cachedSession{
194 arvadosclient: arvadosclient,
195 keepclient: keepclient.New(arvadosclient),
197 c.sessions[token] = sess
203 // Get a long-lived CustomFileSystem suitable for doing a read or
204 // write operation with the given token.
206 // If the returned error is nil, the caller must call Release() on the
207 // returned session when finished using it.
208 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
209 sess, err := c.checkout(token)
211 return nil, nil, nil, err
214 defer sess.refresh.Unlock()
217 refresh := sess.expire.Before(now)
218 if sess.fs == nil || !sess.userLoaded || refresh {
219 // Wait for all active users to finish (otherwise they
220 // might make changes to an old fs after we start
221 // using the new fs).
223 if !sess.userLoaded || refresh {
224 err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil)
225 if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
226 // token is OK, but "get user id" api is out
227 // of scope -- use existing/expired info if
228 // any, or leave empty for unknown user
229 } else if err != nil {
232 return nil, nil, nil, err
234 sess.userLoaded = true
237 if sess.fs == nil || refresh {
238 sess.fs = sess.client.SiteFileSystem(sess.keepclient)
239 sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
240 sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
241 c.metrics.sessionMisses.Inc()
243 c.metrics.sessionHits.Inc()
247 c.metrics.sessionHits.Inc()
250 return sess.fs, sess, &sess.user, nil
253 type sessionSnapshot struct {
257 fs arvados.CustomFileSystem
262 // Remove all expired idle session cache entries, and remove in-memory
263 // filesystems until approximate remaining size <= maxsize
264 func (c *cache) pruneSessions() {
267 snaps := make([]sessionSnapshot, 0, len(c.sessions))
268 for token, sess := range c.sessions {
269 snaps = append(snaps, sessionSnapshot{
276 // Load lastuse/fs/expire data from sessions. Note we do this
277 // after unlocking c.mtx because sess.refresh.Lock sometimes
278 // waits for another goroutine to finish "[re]fetch user
280 for i := range snaps {
281 snaps[i].sess.refresh.Lock()
282 snaps[i].lastuse = snaps[i].sess.lastuse
283 snaps[i].fs = snaps[i].sess.fs
284 snaps[i].prune = snaps[i].sess.expire.Before(now)
285 snaps[i].sess.refresh.Unlock()
288 // Sort sessions with oldest first.
289 sort.Slice(snaps, func(i, j int) bool {
290 return snaps[i].lastuse.Before(snaps[j].lastuse)
293 // Add up size of sessions that aren't already marked for
294 // pruning based on expire time.
296 for i, snap := range snaps {
297 if !snap.prune && snap.fs != nil {
298 size := snap.fs.MemorySize()
303 // Mark more sessions for deletion until reaching desired
304 // memory size limit, starting with the oldest entries.
305 for i, snap := range snaps {
306 if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) {
312 snaps[i].prune = true
316 // Mark more sessions for deletion until reaching desired
317 // session count limit.
318 mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
319 for i := range snaps {
324 for i := range snaps {
327 } else if !snaps[i].prune {
328 snaps[i].prune = true
335 for _, snap := range snaps {
340 if sess.mtx.TryLock() {
341 delete(c.sessions, snap.token)
344 // We can't remove a session that's been checked out
345 // -- that would allow another session to be created
346 // for the same token using a different in-memory
347 // filesystem. Instead, we wait for active requests to
348 // finish and then "unload" it. After this, either the
349 // next GetSession will reload fs/user, or a
350 // subsequent pruneSessions will remove the session.
352 // Ensure nobody is mid-GetSession() (note we
353 // already know nobody is mid-checkout()
354 // because we have c.mtx locked)
356 defer sess.refresh.Unlock()
357 // Wait for current usage to finish (i.e.,
358 // anyone who has decided to use the current
359 // values of sess.fs and sess.user, and hasn't
360 // called Release() yet)
362 defer sess.inuse.Unlock()
365 // Next GetSession will make a new fs
370 // sessionsSize returns the number and approximate total memory size
371 // of all cached sessions.
372 func (c *cache) sessionsSize() (n int, size int64) {
375 sessions := make([]*cachedSession, 0, n)
376 for _, sess := range c.sessions {
377 sessions = append(sessions, sess)
380 for _, sess := range sessions {
383 sess.refresh.Unlock()
385 size += fs.MemorySize()