1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16 "git.arvados.org/arvados.git/sdk/go/keepclient"
17 lru "github.com/hashicorp/golang-lru"
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/sirupsen/logrus"
22 const metricsUpdateInterval = time.Second / 10
25 cluster *arvados.Cluster
26 logger logrus.FieldLogger
27 registry *prometheus.Registry
29 sessions *lru.TwoQueueCache
32 chPruneSessions chan struct{}
35 type cacheMetrics struct {
36 requests prometheus.Counter
37 collectionBytes prometheus.Gauge
38 sessionEntries prometheus.Gauge
39 sessionHits prometheus.Counter
40 sessionMisses prometheus.Counter
43 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
44 m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
46 Subsystem: "keepweb_sessions",
47 Name: "cached_session_bytes",
48 Help: "Total size of all cached sessions.",
50 reg.MustRegister(m.collectionBytes)
51 m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
53 Subsystem: "keepweb_sessions",
55 Help: "Number of active token sessions.",
57 reg.MustRegister(m.sessionEntries)
58 m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
60 Subsystem: "keepweb_sessions",
62 Help: "Number of token session cache hits.",
64 reg.MustRegister(m.sessionHits)
65 m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
67 Subsystem: "keepweb_sessions",
69 Help: "Number of token session cache misses.",
71 reg.MustRegister(m.sessionMisses)
74 type cachedSession struct {
77 client *arvados.Client
78 arvadosclient *arvadosclient.ArvadosClient
79 keepclient *keepclient.KeepClient
83 func (c *cache) setup() {
85 c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
92 reg = prometheus.NewRegistry()
96 for range time.Tick(metricsUpdateInterval) {
100 c.chPruneSessions = make(chan struct{}, 1)
102 for range c.chPruneSessions {
108 func (c *cache) updateGauges() {
109 c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
110 c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
113 var selectPDH = map[string]interface{}{
114 "select": []string{"portable_data_hash"},
117 // ResetSession unloads any potentially stale state. Should be called
118 // after write operations, so subsequent reads don't return stale
120 func (c *cache) ResetSession(token string) {
121 c.setupOnce.Do(c.setup)
122 c.sessions.Remove(token)
125 // Get a long-lived CustomFileSystem suitable for doing a read operation
126 // with the given token.
127 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
128 c.setupOnce.Do(c.setup)
130 ent, _ := c.sessions.Get(token)
131 sess, _ := ent.(*cachedSession)
134 c.metrics.sessionMisses.Inc()
135 sess = &cachedSession{
136 expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
139 sess.client, err = arvados.NewClientFromConfig(c.cluster)
141 return nil, nil, nil, err
143 sess.client.AuthToken = token
144 // A non-empty origin header tells controller to
145 // prioritize our traffic as interactive, which is
146 // true most of the time.
147 origin := c.cluster.Services.WebDAVDownload.ExternalURL
148 sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
149 sess.arvadosclient, err = arvadosclient.New(sess.client)
151 return nil, nil, nil, err
153 sess.keepclient = keepclient.New(sess.arvadosclient)
154 c.sessions.Add(token, sess)
155 } else if sess.expire.Before(now) {
156 c.metrics.sessionMisses.Inc()
159 c.metrics.sessionHits.Inc()
162 case c.chPruneSessions <- struct{}{}:
166 fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
167 if fs == nil || expired {
168 fs = sess.client.SiteFileSystem(sess.keepclient)
169 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
173 user, _ := sess.user.Load().(*arvados.User)
174 if user == nil || expired {
175 user = new(arvados.User)
176 err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
177 if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
178 // token is OK, but "get user id" api is out
179 // of scope -- return nil, signifying unknown
181 } else if err != nil {
182 return nil, nil, nil, err
184 sess.user.Store(user)
187 return fs, sess, user, nil
190 // Remove all expired session cache entries, then remove more entries
191 // until approximate remaining size <= maxsize/2
192 func (c *cache) pruneSessions() {
194 keys := c.sessions.Keys()
195 sizes := make([]int64, len(keys))
197 for i, token := range keys {
198 ent, ok := c.sessions.Peek(token)
202 s := ent.(*cachedSession)
203 if s.expire.Before(now) {
204 c.sessions.Remove(token)
207 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
208 sizes[i] = fs.MemorySize()
212 // Remove tokens until reaching size limit, starting with the
213 // least frequently used entries (which Keys() returns last).
214 for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
216 c.sessions.Remove(keys[i])
222 // collectionBytes returns the approximate combined memory size of the
223 // collection cache and session filesystem cache.
224 func (c *cache) collectionBytes() uint64 {
226 for _, token := range c.sessions.Keys() {
227 ent, ok := c.sessions.Peek(token)
231 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
232 size += uint64(fs.MemorySize())