Merge branch '19362-s3-webdav-sync'
[arvados.git] / services / keep-web / cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepweb
6
7 import (
8         "net/http"
9         "sync"
10         "sync/atomic"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
15         "git.arvados.org/arvados.git/sdk/go/keepclient"
16         lru "github.com/hashicorp/golang-lru"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const metricsUpdateInterval = time.Second / 10
22
23 type cache struct {
24         cluster   *arvados.Cluster
25         logger    logrus.FieldLogger
26         registry  *prometheus.Registry
27         metrics   cacheMetrics
28         sessions  *lru.TwoQueueCache
29         setupOnce sync.Once
30
31         chPruneSessions chan struct{}
32 }
33
34 type cacheMetrics struct {
35         requests        prometheus.Counter
36         collectionBytes prometheus.Gauge
37         sessionEntries  prometheus.Gauge
38         sessionHits     prometheus.Counter
39         sessionMisses   prometheus.Counter
40 }
41
42 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
43         m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
44                 Namespace: "arvados",
45                 Subsystem: "keepweb_sessions",
46                 Name:      "cached_session_bytes",
47                 Help:      "Total size of all cached sessions.",
48         })
49         reg.MustRegister(m.collectionBytes)
50         m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
51                 Namespace: "arvados",
52                 Subsystem: "keepweb_sessions",
53                 Name:      "active",
54                 Help:      "Number of active token sessions.",
55         })
56         reg.MustRegister(m.sessionEntries)
57         m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
58                 Namespace: "arvados",
59                 Subsystem: "keepweb_sessions",
60                 Name:      "hits",
61                 Help:      "Number of token session cache hits.",
62         })
63         reg.MustRegister(m.sessionHits)
64         m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
65                 Namespace: "arvados",
66                 Subsystem: "keepweb_sessions",
67                 Name:      "misses",
68                 Help:      "Number of token session cache misses.",
69         })
70         reg.MustRegister(m.sessionMisses)
71 }
72
73 type cachedSession struct {
74         expire        time.Time
75         fs            atomic.Value
76         client        *arvados.Client
77         arvadosclient *arvadosclient.ArvadosClient
78         keepclient    *keepclient.KeepClient
79         user          atomic.Value
80 }
81
82 func (c *cache) setup() {
83         var err error
84         c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
85         if err != nil {
86                 panic(err)
87         }
88
89         reg := c.registry
90         if reg == nil {
91                 reg = prometheus.NewRegistry()
92         }
93         c.metrics.setup(reg)
94         go func() {
95                 for range time.Tick(metricsUpdateInterval) {
96                         c.updateGauges()
97                 }
98         }()
99         c.chPruneSessions = make(chan struct{}, 1)
100         go func() {
101                 for range c.chPruneSessions {
102                         c.pruneSessions()
103                 }
104         }()
105 }
106
107 func (c *cache) updateGauges() {
108         c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
109         c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
110 }
111
112 var selectPDH = map[string]interface{}{
113         "select": []string{"portable_data_hash"},
114 }
115
116 // ResetSession unloads any potentially stale state. Should be called
117 // after write operations, so subsequent reads don't return stale
118 // data.
119 func (c *cache) ResetSession(token string) {
120         c.setupOnce.Do(c.setup)
121         c.sessions.Remove(token)
122 }
123
124 // Get a long-lived CustomFileSystem suitable for doing a read operation
125 // with the given token.
126 func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
127         c.setupOnce.Do(c.setup)
128         now := time.Now()
129         ent, _ := c.sessions.Get(token)
130         sess, _ := ent.(*cachedSession)
131         expired := false
132         if sess == nil {
133                 c.metrics.sessionMisses.Inc()
134                 sess = &cachedSession{
135                         expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
136                 }
137                 var err error
138                 sess.client, err = arvados.NewClientFromConfig(c.cluster)
139                 if err != nil {
140                         return nil, nil, nil, err
141                 }
142                 sess.client.AuthToken = token
143                 sess.arvadosclient, err = arvadosclient.New(sess.client)
144                 if err != nil {
145                         return nil, nil, nil, err
146                 }
147                 sess.keepclient = keepclient.New(sess.arvadosclient)
148                 c.sessions.Add(token, sess)
149         } else if sess.expire.Before(now) {
150                 c.metrics.sessionMisses.Inc()
151                 expired = true
152         } else {
153                 c.metrics.sessionHits.Inc()
154         }
155         select {
156         case c.chPruneSessions <- struct{}{}:
157         default:
158         }
159
160         fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
161         if fs == nil || expired {
162                 fs = sess.client.SiteFileSystem(sess.keepclient)
163                 fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
164                 sess.fs.Store(fs)
165         }
166
167         user, _ := sess.user.Load().(*arvados.User)
168         if user == nil || expired {
169                 user = new(arvados.User)
170                 err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
171                 if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
172                         // token is OK, but "get user id" api is out
173                         // of scope -- return nil, signifying unknown
174                         // user
175                 } else if err != nil {
176                         return nil, nil, nil, err
177                 }
178                 sess.user.Store(user)
179         }
180
181         return fs, sess, user, nil
182 }
183
184 // Remove all expired session cache entries, then remove more entries
185 // until approximate remaining size <= maxsize/2
186 func (c *cache) pruneSessions() {
187         now := time.Now()
188         keys := c.sessions.Keys()
189         sizes := make([]int64, len(keys))
190         var size int64
191         for i, token := range keys {
192                 ent, ok := c.sessions.Peek(token)
193                 if !ok {
194                         continue
195                 }
196                 s := ent.(*cachedSession)
197                 if s.expire.Before(now) {
198                         c.sessions.Remove(token)
199                         continue
200                 }
201                 if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
202                         sizes[i] = fs.MemorySize()
203                         size += sizes[i]
204                 }
205         }
206         // Remove tokens until reaching size limit, starting with the
207         // least frequently used entries (which Keys() returns last).
208         for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
209                 if sizes[i] > 0 {
210                         c.sessions.Remove(keys[i])
211                         size -= sizes[i]
212                 }
213         }
214 }
215
216 // collectionBytes returns the approximate combined memory size of the
217 // collection cache and session filesystem cache.
218 func (c *cache) collectionBytes() uint64 {
219         var size uint64
220         for _, token := range c.sessions.Keys() {
221                 ent, ok := c.sessions.Peek(token)
222                 if !ok {
223                         continue
224                 }
225                 if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
226                         size += uint64(fs.MemorySize())
227                 }
228         }
229         return size
230 }