21901: keep-web logs based on the last X-Forwarded-For address
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepweb
6
7 import (
8         "errors"
9         "net/http"
10         "sort"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16         "git.arvados.org/arvados.git/sdk/go/keepclient"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const metricsUpdateInterval = time.Second / 10
22
23 type cache struct {
24         cluster   *arvados.Cluster
25         logger    logrus.FieldLogger
26         registry  *prometheus.Registry
27         metrics   cacheMetrics
28         sessions  map[string]*cachedSession
29         setupOnce sync.Once
30         mtx       sync.Mutex
31
32         chPruneSessions chan struct{}
33 }
34
35 type cacheMetrics struct {
36         requests        prometheus.Counter
37         collectionBytes prometheus.Gauge
38         sessionEntries  prometheus.Gauge
39         sessionHits     prometheus.Counter
40         sessionMisses   prometheus.Counter
41 }
42
43 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
44         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
45                 Namespace: "arvados",
46                 Subsystem: "keepweb_sessions",
47                 Name:      "cached_session_bytes",
48                 Help:      "Total size of all cached sessions.",
49         })
50         reg.MustRegister(m.collectionBytes)
51         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
52                 Namespace: "arvados",
53                 Subsystem: "keepweb_sessions",
54                 Name:      "active",
55                 Help:      "Number of active token sessions.",
56         })
57         reg.MustRegister(m.sessionEntries)
58         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
59                 Namespace: "arvados",
60                 Subsystem: "keepweb_sessions",
61                 Name:      "hits",
62                 Help:      "Number of token session cache hits.",
63         })
64         reg.MustRegister(m.sessionHits)
65         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
66                 Namespace: "arvados",
67                 Subsystem: "keepweb_sessions",
68                 Name:      "misses",
69                 Help:      "Number of token session cache misses.",
70         })
71         reg.MustRegister(m.sessionMisses)
72 }
73
74 type cachedSession struct {
75         cache         *cache
76         expire        time.Time
77         client        *arvados.Client
78         arvadosclient *arvadosclient.ArvadosClient
79         keepclient    *keepclient.KeepClient
80
81         // Each session uses a system of three mutexes (plus the
82         // cache-wide mutex) to enable the following semantics:
83         //
84         // - There are never multiple sessions in use for a given
85         // token.
86         //
87         // - If the cached in-memory filesystems/user records are
88         // older than the configured cache TTL when a request starts,
89         // the request will use new ones.
90         //
91         // - Unused sessions are garbage-collected.
92         //
93         // In particular, when it is necessary to reset a session's
94         // filesystem/user record (to save memory or respect the
95         // configured cache TTL), any operations that are already
96         // using the existing filesystem/user record are allowed to
97         // finish before the new filesystem is constructed.
98         //
99         // The locks must be acquired in the following order:
100         // cache.mtx, session.mtx, session.refresh, session.inuse.
101
102         // mtx is RLocked while session is not safe to evict from
103         // cache -- i.e., a checkout() has decided to use it, and its
104         // caller is not finished with it. When locking or rlocking
105         // this mtx, the cache mtx MUST already be held.
106         //
107         // This mutex enables pruneSessions to detect when it is safe
108         // to completely remove the session entry from the cache.
109         mtx sync.RWMutex
110         // refresh must be locked in order to read or write the
111         // fs/user/userLoaded/lastuse fields. This mutex enables
112         // GetSession and pruneSessions to remove/replace fs and user
113         // values safely.
114         refresh sync.Mutex
115         // inuse must be RLocked while the session is in use by a
116         // caller. This mutex enables pruneSessions() to wait for all
117         // existing usage to finish by calling inuse.Lock().
118         inuse sync.RWMutex
119
120         fs         arvados.CustomFileSystem
121         user       arvados.User
122         userLoaded bool
123         lastuse    time.Time
124 }
125
126 func (sess *cachedSession) Release() {
127         sess.inuse.RUnlock()
128         sess.mtx.RUnlock()
129         select {
130         case sess.cache.chPruneSessions <- struct{}{}:
131         default:
132         }
133 }
134
135 func (c *cache) setup() {
136         var err error
137         c.sessions = map[string]*cachedSession{}
138         if err != nil {
139                 panic(err)
140         }
141
142         reg := c.registry
143         if reg == nil {
144                 reg = prometheus.NewRegistry()
145         }
146         c.metrics.setup(reg)
147         go func() {
148                 for range time.Tick(metricsUpdateInterval) {
149                         c.updateGauges()
150                 }
151         }()
152         c.chPruneSessions = make(chan struct{}, 1)
153         go func() {
154                 for range c.chPruneSessions {
155                         c.pruneSessions()
156                 }
157         }()
158 }
159
160 func (c *cache) updateGauges() {
161         n, size := c.sessionsSize()
162         c.metrics.collectionBytes.Set(float64(size))
163         c.metrics.sessionEntries.Set(float64(n))
164 }
165
166 var selectPDH = map[string]interface{}{
167         "select": []string{"portable_data_hash"},
168 }
169
170 func (c *cache) checkout(token string) (*cachedSession, error) {
171         c.setupOnce.Do(c.setup)
172         c.mtx.Lock()
173         defer c.mtx.Unlock()
174         sess := c.sessions[token]
175         if sess == nil {
176                 client, err := arvados.NewClientFromConfig(c.cluster)
177                 if err != nil {
178                         return nil, err
179                 }
180                 client.AuthToken = token
181                 client.Timeout = time.Minute
182                 client.Logger = c.logger
183                 // A non-empty origin header tells controller to
184                 // prioritize our traffic as interactive, which is
185                 // true most of the time.
186                 origin := c.cluster.Services.WebDAVDownload.ExternalURL
187                 client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
188                 arvadosclient, err := arvadosclient.New(client)
189                 if err != nil {
190                         return nil, err
191                 }
192                 kc := keepclient.New(arvadosclient)
193                 kc.DiskCacheSize = c.cluster.Collections.WebDAVCache.DiskCacheSize
194                 sess = &cachedSession{
195                         cache:         c,
196                         client:        client,
197                         arvadosclient: arvadosclient,
198                         keepclient:    kc,
199                 }
200                 c.sessions[token] = sess
201         }
202         sess.mtx.RLock()
203         return sess, nil
204 }
205
206 // Get a long-lived CustomFileSystem suitable for doing a read or
207 // write operation with the given token.
208 //
209 // If the returned error is nil, the caller must call Release() on the
210 // returned session when finished using it.
211 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
212         sess, err := c.checkout(token)
213         if err != nil {
214                 return nil, nil, nil, err
215         }
216         sess.refresh.Lock()
217         defer sess.refresh.Unlock()
218         now := time.Now()
219         sess.lastuse = now
220         refresh := sess.expire.Before(now)
221         if sess.fs == nil || !sess.userLoaded || refresh {
222                 // Wait for all active users to finish (otherwise they
223                 // might make changes to an old fs after we start
224                 // using the new fs).
225                 sess.inuse.Lock()
226                 if !sess.userLoaded || refresh {
227                         err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil)
228                         if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
229                                 // token is OK, but "get user id" api is out
230                                 // of scope -- use existing/expired info if
231                                 // any, or leave empty for unknown user
232                         } else if err != nil {
233                                 sess.inuse.Unlock()
234                                 sess.mtx.RUnlock()
235                                 return nil, nil, nil, err
236                         }
237                         sess.userLoaded = true
238                 }
239
240                 if sess.fs == nil || refresh {
241                         sess.fs = sess.client.SiteFileSystem(sess.keepclient)
242                         sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
243                         sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
244                         c.metrics.sessionMisses.Inc()
245                 } else {
246                         c.metrics.sessionHits.Inc()
247                 }
248                 sess.inuse.Unlock()
249         } else {
250                 c.metrics.sessionHits.Inc()
251         }
252         sess.inuse.RLock()
253         return sess.fs, sess, &sess.user, nil
254 }
255
256 type sessionSnapshot struct {
257         token   string
258         sess    *cachedSession
259         lastuse time.Time
260         fs      arvados.CustomFileSystem
261         size    int64
262         prune   bool
263 }
264
265 // Remove all expired idle session cache entries, and remove in-memory
266 // filesystems until approximate remaining size <= maxsize
267 func (c *cache) pruneSessions() {
268         now := time.Now()
269         c.mtx.Lock()
270         snaps := make([]sessionSnapshot, 0, len(c.sessions))
271         for token, sess := range c.sessions {
272                 snaps = append(snaps, sessionSnapshot{
273                         token: token,
274                         sess:  sess,
275                 })
276         }
277         c.mtx.Unlock()
278
279         // Load lastuse/fs/expire data from sessions. Note we do this
280         // after unlocking c.mtx because sess.refresh.Lock sometimes
281         // waits for another goroutine to finish "[re]fetch user
282         // record".
283         for i := range snaps {
284                 snaps[i].sess.refresh.Lock()
285                 snaps[i].lastuse = snaps[i].sess.lastuse
286                 snaps[i].fs = snaps[i].sess.fs
287                 snaps[i].prune = snaps[i].sess.expire.Before(now)
288                 snaps[i].sess.refresh.Unlock()
289         }
290
291         // Sort sessions with oldest first.
292         sort.Slice(snaps, func(i, j int) bool {
293                 return snaps[i].lastuse.Before(snaps[j].lastuse)
294         })
295
296         // Add up size of sessions that aren't already marked for
297         // pruning based on expire time.
298         var size int64
299         for i, snap := range snaps {
300                 if !snap.prune && snap.fs != nil {
301                         size := snap.fs.MemorySize()
302                         snaps[i].size = size
303                         size += size
304                 }
305         }
306         // Mark more sessions for deletion until reaching desired
307         // memory size limit, starting with the oldest entries.
308         for i, snap := range snaps {
309                 if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) {
310                         break
311                 }
312                 if snap.prune {
313                         continue
314                 }
315                 snaps[i].prune = true
316                 size -= snap.size
317         }
318
319         // Mark more sessions for deletion until reaching desired
320         // session count limit.
321         mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
322         for i := range snaps {
323                 if snaps[i].prune {
324                         mustprune--
325                 }
326         }
327         for i := range snaps {
328                 if mustprune < 1 {
329                         break
330                 } else if !snaps[i].prune {
331                         snaps[i].prune = true
332                         mustprune--
333                 }
334         }
335
336         c.mtx.Lock()
337         defer c.mtx.Unlock()
338         for _, snap := range snaps {
339                 if !snap.prune {
340                         continue
341                 }
342                 sess := snap.sess
343                 if sess.mtx.TryLock() {
344                         delete(c.sessions, snap.token)
345                         continue
346                 }
347                 // We can't remove a session that's been checked out
348                 // -- that would allow another session to be created
349                 // for the same token using a different in-memory
350                 // filesystem. Instead, we wait for active requests to
351                 // finish and then "unload" it. After this, either the
352                 // next GetSession will reload fs/user, or a
353                 // subsequent pruneSessions will remove the session.
354                 go func() {
355                         // Ensure nobody is mid-GetSession() (note we
356                         // already know nobody is mid-checkout()
357                         // because we have c.mtx locked)
358                         sess.refresh.Lock()
359                         defer sess.refresh.Unlock()
360                         // Wait for current usage to finish (i.e.,
361                         // anyone who has decided to use the current
362                         // values of sess.fs and sess.user, and hasn't
363                         // called Release() yet)
364                         sess.inuse.Lock()
365                         defer sess.inuse.Unlock()
366                         // Release memory
367                         sess.fs = nil
368                         // Next GetSession will make a new fs
369                 }()
370         }
371 }
372
373 // sessionsSize returns the number and approximate total memory size
374 // of all cached sessions.
375 func (c *cache) sessionsSize() (n int, size int64) {
376         c.mtx.Lock()
377         n = len(c.sessions)
378         sessions := make([]*cachedSession, 0, n)
379         for _, sess := range c.sessions {
380                 sessions = append(sessions, sess)
381         }
382         c.mtx.Unlock()
383         for _, sess := range sessions {
384                 sess.refresh.Lock()
385                 fs := sess.fs
386                 sess.refresh.Unlock()
387                 if fs != nil {
388                         size += fs.MemorySize()
389                 }
390         }
391         return
392 }