1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
15 "git.arvados.org/arvados.git/sdk/go/keepclient"
16 lru "github.com/hashicorp/golang-lru"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
21 const metricsUpdateInterval = time.Second / 10
24 cluster *arvados.Cluster
25 logger logrus.FieldLogger
26 registry *prometheus.Registry
28 sessions *lru.TwoQueueCache
31 chPruneSessions chan struct{}
34 type cacheMetrics struct {
35 requests prometheus.Counter
36 collectionBytes prometheus.Gauge
37 sessionEntries prometheus.Gauge
38 sessionHits prometheus.Counter
39 sessionMisses prometheus.Counter
42 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
43 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
45 Subsystem: "keepweb_sessions",
46 Name: "cached_session_bytes",
47 Help: "Total size of all cached sessions.",
49 reg.MustRegister(m.collectionBytes)
50 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
52 Subsystem: "keepweb_sessions",
54 Help: "Number of active token sessions.",
56 reg.MustRegister(m.sessionEntries)
57 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
59 Subsystem: "keepweb_sessions",
61 Help: "Number of token session cache hits.",
63 reg.MustRegister(m.sessionHits)
64 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
66 Subsystem: "keepweb_sessions",
68 Help: "Number of token session cache misses.",
70 reg.MustRegister(m.sessionMisses)
73 type cachedSession struct {
76 client *arvados.Client
77 arvadosclient *arvadosclient.ArvadosClient
78 keepclient *keepclient.KeepClient
82 func (c *cache) setup() {
84 c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
91 reg = prometheus.NewRegistry()
95 for range time.Tick(metricsUpdateInterval) {
99 c.chPruneSessions = make(chan struct{}, 1)
101 for range c.chPruneSessions {
107 func (c *cache) updateGauges() {
108 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
109 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
112 var selectPDH = map[string]interface{}{
113 "select": []string{"portable_data_hash"},
116 // ResetSession unloads any potentially stale state. Should be called
117 // after write operations, so subsequent reads don't return stale
119 func (c *cache) ResetSession(token string) {
120 c.setupOnce.Do(c.setup)
121 c.sessions.Remove(token)
124 // Get a long-lived CustomFileSystem suitable for doing a read operation
125 // with the given token.
126 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
127 c.setupOnce.Do(c.setup)
129 ent, _ := c.sessions.Get(token)
130 sess, _ := ent.(*cachedSession)
133 c.metrics.sessionMisses.Inc()
134 sess = &cachedSession{
135 expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
138 sess.client, err = arvados.NewClientFromConfig(c.cluster)
140 return nil, nil, nil, err
142 sess.client.AuthToken = token
143 sess.arvadosclient, err = arvadosclient.New(sess.client)
145 return nil, nil, nil, err
147 sess.keepclient = keepclient.New(sess.arvadosclient)
148 c.sessions.Add(token, sess)
149 } else if sess.expire.Before(now) {
150 c.metrics.sessionMisses.Inc()
153 c.metrics.sessionHits.Inc()
156 case c.chPruneSessions <- struct{}{}:
160 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
161 if fs == nil || expired {
162 fs = sess.client.SiteFileSystem(sess.keepclient)
163 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
167 user, _ := sess.user.Load().(*arvados.User)
168 if user == nil || expired {
169 user = new(arvados.User)
170 err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
171 if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
172 // token is OK, but "get user id" api is out
173 // of scope -- return nil, signifying unknown
175 } else if err != nil {
176 return nil, nil, nil, err
178 sess.user.Store(user)
181 return fs, sess, user, nil
184 // Remove all expired session cache entries, then remove more entries
185 // until approximate remaining size <= maxsize/2
186 func (c *cache) pruneSessions() {
189 keys := c.sessions.Keys()
190 for _, token := range keys {
191 ent, ok := c.sessions.Peek(token)
195 s := ent.(*cachedSession)
196 if s.expire.Before(now) {
197 c.sessions.Remove(token)
200 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
201 size += fs.MemorySize()
204 // Remove tokens until reaching size limit, starting with the
205 // least frequently used entries (which Keys() returns last).
206 for i := len(keys) - 1; i >= 0; i-- {
208 if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes {
211 ent, ok := c.sessions.Peek(token)
215 s := ent.(*cachedSession)
216 fs, _ := s.fs.Load().(arvados.CustomFileSystem)
220 c.sessions.Remove(token)
221 size -= fs.MemorySize()
225 // collectionBytes returns the approximate combined memory size of the
226 // collection cache and session filesystem cache.
227 func (c *cache) collectionBytes() uint64 {
229 for _, token := range c.sessions.Keys() {
230 ent, ok := c.sessions.Peek(token)
234 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
235 size += uint64(fs.MemorySize())