X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/678f1f53466c3f2133f63a6e6ad553f5e5d14804..763f629e11df304e6202fb140adc27d3a08ac1a6:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index b9a1f3069f..c44a2eb739 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -2,123 +2,87 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepweb import ( + "errors" + "net/http" "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "github.com/hashicorp/golang-lru" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/keepclient" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) const metricsUpdateInterval = time.Second / 10 type cache struct { - TTL arvados.Duration - UUIDTTL arvados.Duration - MaxCollectionEntries int - MaxCollectionBytes int64 - MaxPermissionEntries int - MaxUUIDEntries int - - registry *prometheus.Registry - metrics cacheMetrics - pdhs *lru.TwoQueueCache - collections *lru.TwoQueueCache - permissions *lru.TwoQueueCache - setupOnce sync.Once + cluster *arvados.Cluster + logger logrus.FieldLogger + registry *prometheus.Registry + metrics cacheMetrics + sessions *lru.TwoQueueCache + setupOnce sync.Once + + chPruneSessions chan struct{} } type cacheMetrics struct { - requests prometheus.Counter - collectionBytes prometheus.Gauge - collectionEntries prometheus.Gauge - collectionHits prometheus.Counter - pdhHits prometheus.Counter - permissionHits prometheus.Counter - apiCalls prometheus.Counter + requests prometheus.Counter + collectionBytes prometheus.Gauge + sessionEntries prometheus.Gauge + sessionHits prometheus.Counter + sessionMisses prometheus.Counter } func (m *cacheMetrics) setup(reg *prometheus.Registry) { - m.requests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "requests", - Help: "Number of targetID-to-manifest lookups handled.", - }) - reg.MustRegister(m.requests) - m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "hits", - Help: "Number of pdh-to-manifest cache hits.", - }) - reg.MustRegister(m.collectionHits) - m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "pdh_hits", - Help: "Number of uuid-to-pdh cache hits.", - }) - reg.MustRegister(m.pdhHits) - m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{ + m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "permission_hits", - Help: "Number of targetID-to-permission cache hits.", + Subsystem: "keepweb_sessions", + Name: "cached_session_bytes", + Help: "Total size of all cached sessions.", }) - reg.MustRegister(m.permissionHits) - m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{ + reg.MustRegister(m.collectionBytes) + m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "api_calls", - Help: "Number of outgoing API calls made by cache.", + Subsystem: "keepweb_sessions", + Name: "active", + Help: "Number of active token sessions.", }) - reg.MustRegister(m.apiCalls) - m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + reg.MustRegister(m.sessionEntries) + m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifest_bytes", - Help: "Total size of all manifests in cache.", + Subsystem: "keepweb_sessions", + Name: "hits", + Help: "Number of token session cache hits.", }) - reg.MustRegister(m.collectionBytes) - m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ + reg.MustRegister(m.sessionHits) + m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifests", - Help: "Number of manifests in cache.", + Subsystem: "keepweb_sessions", + Name: "misses", + Help: "Number of token session cache misses.", }) - reg.MustRegister(m.collectionEntries) -} - -type cachedPDH struct { - expire time.Time - pdh string + reg.MustRegister(m.sessionMisses) } -type cachedCollection struct { - expire time.Time - collection *arvados.Collection -} - -type cachedPermission struct { - expire time.Time +type cachedSession struct { + expire time.Time + fs atomic.Value + client *arvados.Client + arvadosclient *arvadosclient.ArvadosClient + keepclient *keepclient.KeepClient + user atomic.Value } func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.MaxUUIDEntries) - if err != nil { - panic(err) - } - c.collections, err = lru.New2Q(c.MaxCollectionEntries) - if err != nil { - panic(err) - } - c.permissions, err = lru.New2Q(c.MaxPermissionEntries) + c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions) if err != nil { panic(err) } @@ -133,204 +97,140 @@ func (c *cache) setup() { c.updateGauges() } }() + c.chPruneSessions = make(chan struct{}, 1) + go func() { + for range c.chPruneSessions { + c.pruneSessions() + } + }() } func (c *cache) updateGauges() { c.metrics.collectionBytes.Set(float64(c.collectionBytes())) - c.metrics.collectionEntries.Set(float64(c.collections.Len())) + c.metrics.sessionEntries.Set(float64(c.sessions.Len())) } var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } -// Update saves a modified version (fs) to an existing collection -// (coll) and, if successful, updates the relevant cache entries so -// subsequent calls to Get() reflect the modifications. -func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { +// ResetSession unloads any potentially stale state. Should be called +// after write operations, so subsequent reads don't return stale +// data. +func (c *cache) ResetSession(token string) { c.setupOnce.Do(c.setup) - - if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText { - return err - } else { - coll.ManifestText = m - } - var updated arvados.Collection - defer c.pdhs.Remove(coll.UUID) - err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ - "collection": map[string]string{ - "manifest_text": coll.ManifestText, - }, - }) - if err == nil { - c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{ - expire: time.Now().Add(time.Duration(c.TTL)), - collection: &updated, - }) - } - return err + c.sessions.Remove(token) } -func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { +// Get a long-lived CustomFileSystem suitable for doing a read operation +// with the given token. +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) { c.setupOnce.Do(c.setup) - c.metrics.requests.Inc() - - permOK := false - permKey := arv.ApiToken + "\000" + targetID - if forceReload { - } else if ent, cached := c.permissions.Get(permKey); cached { - ent := ent.(*cachedPermission) - if ent.expire.Before(time.Now()) { - c.permissions.Remove(permKey) - } else { - permOK = true - c.metrics.permissionHits.Inc() + now := time.Now() + ent, _ := c.sessions.Get(token) + sess, _ := ent.(*cachedSession) + expired := false + if sess == nil { + c.metrics.sessionMisses.Inc() + sess = &cachedSession{ + expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()), } - } - - var pdh string - if arvadosclient.PDHMatch(targetID) { - pdh = targetID - } else if ent, cached := c.pdhs.Get(targetID); cached { - ent := ent.(*cachedPDH) - if ent.expire.Before(time.Now()) { - c.pdhs.Remove(targetID) - } else { - pdh = ent.pdh - c.metrics.pdhHits.Inc() + var err error + sess.client, err = arvados.NewClientFromConfig(c.cluster) + if err != nil { + return nil, nil, nil, err } - } - - var collection *arvados.Collection - if pdh != "" { - collection = c.lookupCollection(arv.ApiToken + "\000" + pdh) - } - - if collection != nil && permOK { - return collection, nil - } else if collection != nil { - // Ask API for current PDH for this targetID. Most - // likely, the cached PDH is still correct; if so, - // _and_ the current token has permission, we can - // use our cached manifest. - c.metrics.apiCalls.Inc() - var current arvados.Collection - err := arv.Get("collections", targetID, selectPDH, ¤t) + sess.client.AuthToken = token + // A non-empty origin header tells controller to + // prioritize our traffic as interactive, which is + // true most of the time. + origin := c.cluster.Services.WebDAVDownload.ExternalURL + sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}} + sess.arvadosclient, err = arvadosclient.New(sess.client) if err != nil { - return nil, err + return nil, nil, nil, err } - if current.PortableDataHash == pdh { - c.permissions.Add(permKey, &cachedPermission{ - expire: time.Now().Add(time.Duration(c.TTL)), - }) - if pdh != targetID { - c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.UUIDTTL)), - pdh: pdh, - }) - } - return collection, err - } else { - // PDH changed, but now we know we have - // permission -- and maybe we already have the - // new PDH in the cache. - if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil { - return coll, nil - } + sess.keepclient = keepclient.New(sess.arvadosclient) + c.sessions.Add(token, sess) + } else if sess.expire.Before(now) { + c.metrics.sessionMisses.Inc() + expired = true + } else { + c.metrics.sessionHits.Inc() + } + select { + case c.chPruneSessions <- struct{}{}: + default: + } + + fs, _ := sess.fs.Load().(arvados.CustomFileSystem) + if fs == nil || expired { + fs = sess.client.SiteFileSystem(sess.keepclient) + fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.fs.Store(fs) + } + + user, _ := sess.user.Load().(*arvados.User) + if user == nil || expired { + user = new(arvados.User) + err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil) + if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden { + // token is OK, but "get user id" api is out + // of scope -- return nil, signifying unknown + // user + } else if err != nil { + return nil, nil, nil, err } + sess.user.Store(user) } - // Collection manifest is not cached. - c.metrics.apiCalls.Inc() - err := arv.Get("collections", targetID, nil, &collection) - if err != nil { - return nil, err - } - exp := time.Now().Add(time.Duration(c.TTL)) - c.permissions.Add(permKey, &cachedPermission{ - expire: exp, - }) - c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.UUIDTTL)), - pdh: collection.PortableDataHash, - }) - c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{ - expire: exp, - collection: collection, - }) - if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) { - go c.pruneCollections() - } - return collection, nil + return fs, sess, user, nil } -// pruneCollections checks the total bytes occupied by manifest_text -// in the collection cache and removes old entries as needed to bring -// the total size down to CollectionBytes. It also deletes all expired -// entries. -// -// pruneCollections does not aim to be perfectly correct when there is -// concurrent cache activity. -func (c *cache) pruneCollections() { - var size int64 +// Remove all expired session cache entries, then remove more entries +// until approximate remaining size <= maxsize/2 +func (c *cache) pruneSessions() { now := time.Now() - keys := c.collections.Keys() - entsize := make([]int, len(keys)) - expired := make([]bool, len(keys)) - for i, k := range keys { - v, ok := c.collections.Peek(k) + keys := c.sessions.Keys() + sizes := make([]int64, len(keys)) + var size int64 + for i, token := range keys { + ent, ok := c.sessions.Peek(token) if !ok { continue } - ent := v.(*cachedCollection) - n := len(ent.collection.ManifestText) - size += int64(n) - entsize[i] = n - expired[i] = ent.expire.Before(now) - } - for i, k := range keys { - if expired[i] { - c.collections.Remove(k) - size -= int64(entsize[i]) + s := ent.(*cachedSession) + if s.expire.Before(now) { + c.sessions.Remove(token) + continue } - } - for i, k := range keys { - if size <= c.MaxCollectionBytes { - break + if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok { + sizes[i] = fs.MemorySize() + size += sizes[i] } - if expired[i] { - // already removed this entry in the previous loop - continue + } + // Remove tokens until reaching size limit, starting with the + // least frequently used entries (which Keys() returns last). + for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- { + if sizes[i] > 0 { + c.sessions.Remove(keys[i]) + size -= sizes[i] } - c.collections.Remove(k) - size -= int64(entsize[i]) } } -// collectionBytes returns the approximate memory size of the -// collection cache. +// collectionBytes returns the approximate combined memory size of the +// collection cache and session filesystem cache. func (c *cache) collectionBytes() uint64 { var size uint64 - for _, k := range c.collections.Keys() { - v, ok := c.collections.Peek(k) + for _, token := range c.sessions.Keys() { + ent, ok := c.sessions.Peek(token) if !ok { continue } - size += uint64(len(v.(*cachedCollection).collection.ManifestText)) + if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok { + size += uint64(fs.MemorySize()) + } } return size } - -func (c *cache) lookupCollection(key string) *arvados.Collection { - e, cached := c.collections.Get(key) - if !cached { - return nil - } - ent := e.(*cachedCollection) - if ent.expire.Before(time.Now()) { - c.collections.Remove(key) - return nil - } - c.metrics.collectionHits.Inc() - return ent.collection -}