1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16 "git.arvados.org/arvados.git/sdk/go/keepclient"
17 lru "github.com/hashicorp/golang-lru"
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/sirupsen/logrus"
22 const metricsUpdateInterval = time.Second / 10
25 cluster *arvados.Cluster
26 logger logrus.FieldLogger
27 registry *prometheus.Registry
29 sessions *lru.TwoQueueCache
32 chPruneSessions chan struct{}
35 type cacheMetrics struct {
36 requests prometheus.Counter
37 collectionBytes prometheus.Gauge
38 sessionEntries prometheus.Gauge
39 sessionHits prometheus.Counter
40 sessionMisses prometheus.Counter
43 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
44 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
46 Subsystem: "keepweb_sessions",
47 Name: "cached_session_bytes",
48 Help: "Total size of all cached sessions.",
50 reg.MustRegister(m.collectionBytes)
51 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
53 Subsystem: "keepweb_sessions",
55 Help: "Number of active token sessions.",
57 reg.MustRegister(m.sessionEntries)
58 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
60 Subsystem: "keepweb_sessions",
62 Help: "Number of token session cache hits.",
64 reg.MustRegister(m.sessionHits)
65 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
67 Subsystem: "keepweb_sessions",
69 Help: "Number of token session cache misses.",
71 reg.MustRegister(m.sessionMisses)
74 type cachedSession struct {
77 client *arvados.Client
78 arvadosclient *arvadosclient.ArvadosClient
79 keepclient *keepclient.KeepClient
83 func (c *cache) setup() {
85 c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
92 reg = prometheus.NewRegistry()
96 for range time.Tick(metricsUpdateInterval) {
100 c.chPruneSessions = make(chan struct{}, 1)
102 for range c.chPruneSessions {
108 func (c *cache) updateGauges() {
109 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
110 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
113 var selectPDH = map[string]interface{}{
114 "select": []string{"portable_data_hash"},
117 // ResetSession unloads any potentially stale state. Should be called
118 // after write operations, so subsequent reads don't return stale
120 func (c *cache) ResetSession(token string) {
121 c.setupOnce.Do(c.setup)
122 c.sessions.Remove(token)
125 // Get a long-lived CustomFileSystem suitable for doing a read operation
126 // with the given token.
127 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
128 c.setupOnce.Do(c.setup)
130 ent, _ := c.sessions.Get(token)
131 sess, _ := ent.(*cachedSession)
134 c.metrics.sessionMisses.Inc()
135 sess = &cachedSession{
136 expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
139 sess.client, err = arvados.NewClientFromConfig(c.cluster)
141 return nil, nil, nil, err
143 sess.client.AuthToken = token
144 sess.arvadosclient, err = arvadosclient.New(sess.client)
146 return nil, nil, nil, err
148 sess.keepclient = keepclient.New(sess.arvadosclient)
149 c.sessions.Add(token, sess)
150 } else if sess.expire.Before(now) {
151 c.metrics.sessionMisses.Inc()
154 c.metrics.sessionHits.Inc()
157 case c.chPruneSessions <- struct{}{}:
161 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
162 if fs == nil || expired {
163 fs = sess.client.SiteFileSystem(sess.keepclient)
164 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
168 user, _ := sess.user.Load().(*arvados.User)
169 if user == nil || expired {
170 user = new(arvados.User)
171 err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
172 if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
173 // token is OK, but "get user id" api is out
174 // of scope -- return nil, signifying unknown
176 } else if err != nil {
177 return nil, nil, nil, err
179 sess.user.Store(user)
182 return fs, sess, user, nil
185 // Remove all expired session cache entries, then remove more entries
186 // until approximate remaining size <= maxsize/2
187 func (c *cache) pruneSessions() {
189 keys := c.sessions.Keys()
190 sizes := make([]int64, len(keys))
192 for i, token := range keys {
193 ent, ok := c.sessions.Peek(token)
197 s := ent.(*cachedSession)
198 if s.expire.Before(now) {
199 c.sessions.Remove(token)
202 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
203 sizes[i] = fs.MemorySize()
207 // Remove tokens until reaching size limit, starting with the
208 // least frequently used entries (which Keys() returns last).
209 for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
211 c.sessions.Remove(keys[i])
217 // collectionBytes returns the approximate combined memory size of the
218 // collection cache and session filesystem cache.
219 func (c *cache) collectionBytes() uint64 {
221 for _, token := range c.sessions.Keys() {
222 ent, ok := c.sessions.Peek(token)
226 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
227 size += uint64(fs.MemorySize())