X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/336638f432e46f61cc87b958580e098e81cea921..9acc8cb9cd9ca4429712b0d31b647e9a6ecf2d96:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index b2bab78216..d5fdc4997e 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -2,56 +2,46 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepweb import ( "sync" "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "github.com/hashicorp/golang-lru" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/keepclient" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) const metricsUpdateInterval = time.Second / 10 type cache struct { - TTL arvados.Duration - UUIDTTL arvados.Duration - MaxCollectionEntries int - MaxCollectionBytes int64 - MaxPermissionEntries int - MaxUUIDEntries int - + cluster *arvados.Cluster + logger logrus.FieldLogger registry *prometheus.Registry - stats cacheStats metrics cacheMetrics pdhs *lru.TwoQueueCache collections *lru.TwoQueueCache - permissions *lru.TwoQueueCache + sessions *lru.TwoQueueCache setupOnce sync.Once -} -// cacheStats is EOL - add new metrics to cacheMetrics instead -type cacheStats struct { - Requests uint64 `json:"Cache.Requests"` - CollectionBytes uint64 `json:"Cache.CollectionBytes"` - CollectionEntries int `json:"Cache.CollectionEntries"` - CollectionHits uint64 `json:"Cache.CollectionHits"` - PDHHits uint64 `json:"Cache.UUIDHits"` - PermissionHits uint64 `json:"Cache.PermissionHits"` - APICalls uint64 `json:"Cache.APICalls"` + chPruneSessions chan struct{} + chPruneCollections chan struct{} } type cacheMetrics struct { requests prometheus.Counter collectionBytes prometheus.Gauge collectionEntries prometheus.Gauge + sessionEntries prometheus.Gauge collectionHits prometheus.Counter pdhHits prometheus.Counter - permissionHits prometheus.Counter + sessionHits prometheus.Counter + sessionMisses prometheus.Counter apiCalls prometheus.Counter } @@ -77,13 +67,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { Help: "Number of uuid-to-pdh cache hits.", }) reg.MustRegister(m.pdhHits) - m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "permission_hits", - Help: "Number of targetID-to-permission cache hits.", - }) - reg.MustRegister(m.permissionHits) m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "arvados", Subsystem: "keepweb_collectioncache", @@ -93,9 +76,9 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { reg.MustRegister(m.apiCalls) m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifest_bytes", - Help: "Total size of all manifests in cache.", + Subsystem: "keepweb_sessions", + Name: "cached_collection_bytes", + Help: "Total size of all cached manifests and sessions.", }) reg.MustRegister(m.collectionBytes) m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -105,11 +88,33 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { Help: "Number of manifests in cache.", }) reg.MustRegister(m.collectionEntries) + m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "active", + Help: "Number of active token sessions.", + }) + reg.MustRegister(m.sessionEntries) + m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "hits", + Help: "Number of token session cache hits.", + }) + reg.MustRegister(m.sessionHits) + m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "misses", + Help: "Number of token session cache misses.", + }) + reg.MustRegister(m.sessionMisses) } type cachedPDH struct { - expire time.Time - pdh string + expire time.Time + refresh time.Time + pdh string } type cachedCollection struct { @@ -121,17 +126,26 @@ type cachedPermission struct { expire time.Time } +type cachedSession struct { + expire time.Time + fs atomic.Value + client *arvados.Client + arvadosclient *arvadosclient.ArvadosClient + keepclient *keepclient.KeepClient + user atomic.Value +} + func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.MaxUUIDEntries) + c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries) if err != nil { panic(err) } - c.collections, err = lru.New2Q(c.MaxCollectionEntries) + c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) if err != nil { panic(err) } - c.permissions, err = lru.New2Q(c.MaxPermissionEntries) + c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions) if err != nil { panic(err) } @@ -146,73 +160,162 @@ func (c *cache) setup() { c.updateGauges() } }() + c.chPruneCollections = make(chan struct{}, 1) + go func() { + for range c.chPruneCollections { + c.pruneCollections() + } + }() + c.chPruneSessions = make(chan struct{}, 1) + go func() { + for range c.chPruneSessions { + c.pruneSessions() + } + }() } func (c *cache) updateGauges() { c.metrics.collectionBytes.Set(float64(c.collectionBytes())) c.metrics.collectionEntries.Set(float64(c.collections.Len())) + c.metrics.sessionEntries.Set(float64(c.sessions.Len())) } var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } -func (c *cache) Stats() cacheStats { - c.setupOnce.Do(c.setup) - return cacheStats{ - Requests: atomic.LoadUint64(&c.stats.Requests), - CollectionBytes: c.collectionBytes(), - CollectionEntries: c.collections.Len(), - CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits), - PDHHits: atomic.LoadUint64(&c.stats.PDHHits), - PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits), - APICalls: atomic.LoadUint64(&c.stats.APICalls), - } -} - // Update saves a modified version (fs) to an existing collection // (coll) and, if successful, updates the relevant cache entries so // subsequent calls to Get() reflect the modifications. func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { c.setupOnce.Do(c.setup) - if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText { + m, err := fs.MarshalManifest(".") + if err != nil || m == coll.ManifestText { return err - } else { - coll.ManifestText = m } + coll.ManifestText = m var updated arvados.Collection - defer c.pdhs.Remove(coll.UUID) - err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil) - if err == nil { - c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{ - expire: time.Now().Add(time.Duration(c.TTL)), - collection: &updated, - }) + err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ + "collection": map[string]string{ + "manifest_text": coll.ManifestText, + }, + }) + if err != nil { + c.pdhs.Remove(coll.UUID) + return err } - return err + c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{ + expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)), + collection: &updated, + }) + c.pdhs.Add(coll.UUID, &cachedPDH{ + expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)), + refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)), + pdh: updated.PortableDataHash, + }) + return nil } -func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { +// ResetSession unloads any potentially stale state. Should be called +// after write operations, so subsequent reads don't return stale +// data. +func (c *cache) ResetSession(token string) { c.setupOnce.Do(c.setup) + c.sessions.Remove(token) +} - atomic.AddUint64(&c.stats.Requests, 1) - c.metrics.requests.Inc() +// Get a long-lived CustomFileSystem suitable for doing a read operation +// with the given token. +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) { + c.setupOnce.Do(c.setup) + now := time.Now() + ent, _ := c.sessions.Get(token) + sess, _ := ent.(*cachedSession) + expired := false + if sess == nil { + c.metrics.sessionMisses.Inc() + sess = &cachedSession{ + expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()), + } + var err error + sess.client, err = arvados.NewClientFromConfig(c.cluster) + if err != nil { + return nil, nil, err + } + sess.client.AuthToken = token + sess.arvadosclient, err = arvadosclient.New(sess.client) + if err != nil { + return nil, nil, err + } + sess.keepclient = keepclient.New(sess.arvadosclient) + c.sessions.Add(token, sess) + } else if sess.expire.Before(now) { + c.metrics.sessionMisses.Inc() + expired = true + } else { + c.metrics.sessionHits.Inc() + } + select { + case c.chPruneSessions <- struct{}{}: + default: + } + fs, _ := sess.fs.Load().(arvados.CustomFileSystem) + if fs != nil && !expired { + return fs, sess, nil + } + fs = sess.client.SiteFileSystem(sess.keepclient) + fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.fs.Store(fs) + return fs, sess, nil +} - permOK := false - permKey := arv.ApiToken + "\000" + targetID - if forceReload { - } else if ent, cached := c.permissions.Get(permKey); cached { - ent := ent.(*cachedPermission) - if ent.expire.Before(time.Now()) { - c.permissions.Remove(permKey) - } else { - permOK = true - atomic.AddUint64(&c.stats.PermissionHits, 1) - c.metrics.permissionHits.Inc() +// Remove all expired session cache entries, then remove more entries +// until approximate remaining size <= maxsize/2 +func (c *cache) pruneSessions() { + now := time.Now() + var size int64 + keys := c.sessions.Keys() + for _, token := range keys { + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + s := ent.(*cachedSession) + if s.expire.Before(now) { + c.sessions.Remove(token) + continue + } + if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok { + size += fs.MemorySize() } } + // Remove tokens until reaching size limit, starting with the + // least frequently used entries (which Keys() returns last). + for i := len(keys) - 1; i >= 0; i-- { + token := keys[i] + if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { + break + } + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + s := ent.(*cachedSession) + fs, _ := s.fs.Load().(arvados.CustomFileSystem) + if fs == nil { + continue + } + c.sessions.Remove(token) + size -= fs.MemorySize() + } +} + +func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { + c.setupOnce.Do(c.setup) + c.metrics.requests.Inc() + var pdhRefresh bool var pdh string if arvadosclient.PDHMatch(targetID) { pdh = targetID @@ -222,24 +325,29 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo c.pdhs.Remove(targetID) } else { pdh = ent.pdh - atomic.AddUint64(&c.stats.PDHHits, 1) + pdhRefresh = forceReload || time.Now().After(ent.refresh) c.metrics.pdhHits.Inc() } } - var collection *arvados.Collection - if pdh != "" { - collection = c.lookupCollection(arv.ApiToken + "\000" + pdh) - } - - if collection != nil && permOK { - return collection, nil - } else if collection != nil { - // Ask API for current PDH for this targetID. Most - // likely, the cached PDH is still correct; if so, - // _and_ the current token has permission, we can - // use our cached manifest. - atomic.AddUint64(&c.stats.APICalls, 1) + if pdh == "" { + // UUID->PDH mapping is not cached, might as well get + // the whole collection record and be done (below). + c.logger.Debugf("cache(%s): have no pdh", targetID) + } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil { + // PDH->manifest is not cached, might as well get the + // whole collection record (below). + c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh) + } else if !pdhRefresh { + // We looked up UUID->PDH very recently, and we still + // have the manifest for that PDH. + c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh) + return cached, nil + } else { + // Get current PDH for this UUID (and confirm we still + // have read permission). Most likely, the cached PDH + // is still correct, in which case we can use our + // cached manifest. c.metrics.apiCalls.Inc() var current arvados.Collection err := arv.Get("collections", targetID, selectPDH, ¤t) @@ -247,49 +355,47 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo return nil, err } if current.PortableDataHash == pdh { - c.permissions.Add(permKey, &cachedPermission{ - expire: time.Now().Add(time.Duration(c.TTL)), - }) - if pdh != targetID { - c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.UUIDTTL)), - pdh: pdh, - }) - } - return collection, err - } else { - // PDH changed, but now we know we have - // permission -- and maybe we already have the - // new PDH in the cache. - if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil { - return coll, nil - } + // PDH has not changed, cached manifest is + // correct. + c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh) + return cached, nil + } + if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil { + // PDH changed, and we already have the + // manifest for that new PDH. + c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash) + return cached, nil } } - // Collection manifest is not cached. - atomic.AddUint64(&c.stats.APICalls, 1) + // Either UUID->PDH is not cached, or PDH->manifest is not + // cached. + var retrieved arvados.Collection c.metrics.apiCalls.Inc() - err := arv.Get("collections", targetID, nil, &collection) + err := arv.Get("collections", targetID, nil, &retrieved) if err != nil { return nil, err } - exp := time.Now().Add(time.Duration(c.TTL)) - c.permissions.Add(permKey, &cachedPermission{ - expire: exp, - }) - c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.UUIDTTL)), - pdh: collection.PortableDataHash, - }) - c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{ + c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash) + exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)) + if targetID != retrieved.PortableDataHash { + c.pdhs.Add(targetID, &cachedPDH{ + expire: exp, + refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)), + pdh: retrieved.PortableDataHash, + }) + } + c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{ expire: exp, - collection: collection, + collection: &retrieved, }) - if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) { - go c.pruneCollections() + if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) { + select { + case c.chPruneCollections <- struct{}{}: + default: + } } - return collection, nil + return &retrieved, nil } // pruneCollections checks the total bytes occupied by manifest_text @@ -323,7 +429,7 @@ func (c *cache) pruneCollections() { } } for i, k := range keys { - if size <= c.MaxCollectionBytes { + if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { break } if expired[i] { @@ -335,8 +441,8 @@ func (c *cache) pruneCollections() { } } -// collectionBytes returns the approximate memory size of the -// collection cache. +// collectionBytes returns the approximate combined memory size of the +// collection cache and session filesystem cache. func (c *cache) collectionBytes() uint64 { var size uint64 for _, k := range c.collections.Keys() { @@ -346,6 +452,15 @@ func (c *cache) collectionBytes() uint64 { } size += uint64(len(v.(*cachedCollection).collection.ManifestText)) } + for _, token := range c.sessions.Keys() { + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok { + size += uint64(fs.MemorySize()) + } + } return size } @@ -359,7 +474,38 @@ func (c *cache) lookupCollection(key string) *arvados.Collection { c.collections.Remove(key) return nil } - atomic.AddUint64(&c.stats.CollectionHits, 1) c.metrics.collectionHits.Inc() return ent.collection } + +func (c *cache) GetTokenUser(token string) (*arvados.User, error) { + // Get and cache user record associated with this + // token. We need to know their UUID for logging, and + // whether they are an admin or not for certain + // permission checks. + + // Get/create session entry + _, sess, err := c.GetSession(token) + if err != nil { + return nil, err + } + + // See if the user is already set, and if so, return it + user, _ := sess.user.Load().(*arvados.User) + if user != nil { + return user, nil + } + + // Fetch the user record + c.metrics.apiCalls.Inc() + var current arvados.User + + err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil) + if err != nil { + return nil, err + } + + // Stash the user record for next time + sess.user.Store(¤t) + return ¤t, nil +}