X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/28d7ed94b33411909941c72289bc227a38e80cda..35db495717a628e0a6ef52a453b8d8ced793c41b:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index 7ec8639aba..db06d63509 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -5,6 +5,7 @@ package keepweb import ( + "net/http" "sync" "sync/atomic" "time" @@ -20,74 +21,32 @@ import ( const metricsUpdateInterval = time.Second / 10 type cache struct { - cluster *arvados.Cluster - logger logrus.FieldLogger - registry *prometheus.Registry - metrics cacheMetrics - pdhs *lru.TwoQueueCache - collections *lru.TwoQueueCache - sessions *lru.TwoQueueCache - setupOnce sync.Once + cluster *arvados.Cluster + logger logrus.FieldLogger + registry *prometheus.Registry + metrics cacheMetrics + sessions *lru.TwoQueueCache + setupOnce sync.Once - chPruneSessions chan struct{} - chPruneCollections chan struct{} + chPruneSessions chan struct{} } type cacheMetrics struct { - requests prometheus.Counter - collectionBytes prometheus.Gauge - collectionEntries prometheus.Gauge - sessionEntries prometheus.Gauge - collectionHits prometheus.Counter - pdhHits prometheus.Counter - sessionHits prometheus.Counter - sessionMisses prometheus.Counter - apiCalls prometheus.Counter + requests prometheus.Counter + collectionBytes prometheus.Gauge + sessionEntries prometheus.Gauge + sessionHits prometheus.Counter + sessionMisses prometheus.Counter } func (m *cacheMetrics) setup(reg *prometheus.Registry) { - m.requests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "requests", - Help: "Number of targetID-to-manifest lookups handled.", - }) - reg.MustRegister(m.requests) - m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "hits", - Help: "Number of pdh-to-manifest cache hits.", - }) - reg.MustRegister(m.collectionHits) - m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "pdh_hits", - Help: "Number of uuid-to-pdh cache hits.", - }) - reg.MustRegister(m.pdhHits) - m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "api_calls", - Help: "Number of outgoing API calls made by cache.", - }) - reg.MustRegister(m.apiCalls) m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepweb_sessions", - Name: "cached_collection_bytes", - Help: "Total size of all cached manifests and sessions.", + Name: "cached_session_bytes", + Help: "Total size of all cached sessions.", }) reg.MustRegister(m.collectionBytes) - m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifests", - Help: "Number of manifests in cache.", - }) - reg.MustRegister(m.collectionEntries) m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepweb_sessions", @@ -111,21 +70,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { reg.MustRegister(m.sessionMisses) } -type cachedPDH struct { - expire time.Time - refresh time.Time - pdh string -} - -type cachedCollection struct { - expire time.Time - collection *arvados.Collection -} - -type cachedPermission struct { - expire time.Time -} - type cachedSession struct { expire time.Time fs atomic.Value @@ -137,14 +81,6 @@ type cachedSession struct { func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries) - if err != nil { - panic(err) - } - c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) - if err != nil { - panic(err) - } c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions) if err != nil { panic(err) @@ -160,12 +96,6 @@ func (c *cache) setup() { c.updateGauges() } }() - c.chPruneCollections = make(chan struct{}, 1) - go func() { - for range c.chPruneCollections { - c.pruneCollections() - } - }() c.chPruneSessions = make(chan struct{}, 1) go func() { for range c.chPruneSessions { @@ -176,7 +106,6 @@ func (c *cache) setup() { func (c *cache) updateGauges() { c.metrics.collectionBytes.Set(float64(c.collectionBytes())) - c.metrics.collectionEntries.Set(float64(c.collections.Len())) c.metrics.sessionEntries.Set(float64(c.sessions.Len())) } @@ -184,39 +113,6 @@ var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } -// Update saves a modified version (fs) to an existing collection -// (coll) and, if successful, updates the relevant cache entries so -// subsequent calls to Get() reflect the modifications. -func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { - c.setupOnce.Do(c.setup) - - m, err := fs.MarshalManifest(".") - if err != nil || m == coll.ManifestText { - return err - } - coll.ManifestText = m - var updated arvados.Collection - err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ - "collection": map[string]string{ - "manifest_text": coll.ManifestText, - }, - }) - if err != nil { - c.pdhs.Remove(coll.UUID) - return err - } - c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{ - expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)), - collection: &updated, - }) - c.pdhs.Add(coll.UUID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)), - refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)), - pdh: updated.PortableDataHash, - }) - return nil -} - // ResetSession unloads any potentially stale state. Should be called // after write operations, so subsequent reads don't return stale // data. @@ -227,7 +123,7 @@ func (c *cache) ResetSession(token string) { // Get a long-lived CustomFileSystem suitable for doing a read operation // with the given token. -func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) { +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) { c.setupOnce.Do(c.setup) now := time.Now() ent, _ := c.sessions.Get(token) @@ -241,12 +137,12 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi var err error sess.client, err = arvados.NewClientFromConfig(c.cluster) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sess.client.AuthToken = token sess.arvadosclient, err = arvadosclient.New(sess.client) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sess.keepclient = keepclient.New(sess.arvadosclient) c.sessions.Add(token, sess) @@ -260,14 +156,29 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi case c.chPruneSessions <- struct{}{}: default: } + fs, _ := sess.fs.Load().(arvados.CustomFileSystem) - if fs != nil && !expired { - return fs, sess, nil + if fs == nil || expired { + fs = sess.client.SiteFileSystem(sess.keepclient) + fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.fs.Store(fs) } - fs = sess.client.SiteFileSystem(sess.keepclient) - fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) - sess.fs.Store(fs) - return fs, sess, nil + + user, _ := sess.user.Load().(*arvados.User) + if user == nil || expired { + user = new(arvados.User) + err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil) + if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden { + // token is OK, but "get user id" api is out + // of scope -- return nil, signifying unknown + // user + } else if err != nil { + return nil, nil, nil, err + } + sess.user.Store(user) + } + + return fs, sess, user, nil } // Remove all expired session cache entries, then remove more entries @@ -294,7 +205,7 @@ func (c *cache) pruneSessions() { } // Remove tokens until reaching size limit, starting with the // least frequently used entries (which Keys() returns last). - for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- { + for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- { if sizes[i] > 0 { c.sessions.Remove(keys[i]) size -= sizes[i] @@ -302,147 +213,10 @@ func (c *cache) pruneSessions() { } } -func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { - c.setupOnce.Do(c.setup) - c.metrics.requests.Inc() - - var pdhRefresh bool - var pdh string - if arvadosclient.PDHMatch(targetID) { - pdh = targetID - } else if ent, cached := c.pdhs.Get(targetID); cached { - ent := ent.(*cachedPDH) - if ent.expire.Before(time.Now()) { - c.pdhs.Remove(targetID) - } else { - pdh = ent.pdh - pdhRefresh = forceReload || time.Now().After(ent.refresh) - c.metrics.pdhHits.Inc() - } - } - - if pdh == "" { - // UUID->PDH mapping is not cached, might as well get - // the whole collection record and be done (below). - c.logger.Debugf("cache(%s): have no pdh", targetID) - } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil { - // PDH->manifest is not cached, might as well get the - // whole collection record (below). - c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh) - } else if !pdhRefresh { - // We looked up UUID->PDH very recently, and we still - // have the manifest for that PDH. - c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh) - return cached, nil - } else { - // Get current PDH for this UUID (and confirm we still - // have read permission). Most likely, the cached PDH - // is still correct, in which case we can use our - // cached manifest. - c.metrics.apiCalls.Inc() - var current arvados.Collection - err := arv.Get("collections", targetID, selectPDH, ¤t) - if err != nil { - return nil, err - } - if current.PortableDataHash == pdh { - // PDH has not changed, cached manifest is - // correct. - c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh) - return cached, nil - } - if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil { - // PDH changed, and we already have the - // manifest for that new PDH. - c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash) - return cached, nil - } - } - - // Either UUID->PDH is not cached, or PDH->manifest is not - // cached. - var retrieved arvados.Collection - c.metrics.apiCalls.Inc() - err := arv.Get("collections", targetID, nil, &retrieved) - if err != nil { - return nil, err - } - c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash) - exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)) - if targetID != retrieved.PortableDataHash { - c.pdhs.Add(targetID, &cachedPDH{ - expire: exp, - refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)), - pdh: retrieved.PortableDataHash, - }) - } - c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{ - expire: exp, - collection: &retrieved, - }) - if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) { - select { - case c.chPruneCollections <- struct{}{}: - default: - } - } - return &retrieved, nil -} - -// pruneCollections checks the total bytes occupied by manifest_text -// in the collection cache and removes old entries as needed to bring -// the total size down to CollectionBytes. It also deletes all expired -// entries. -// -// pruneCollections does not aim to be perfectly correct when there is -// concurrent cache activity. -func (c *cache) pruneCollections() { - var size int64 - now := time.Now() - keys := c.collections.Keys() - entsize := make([]int, len(keys)) - expired := make([]bool, len(keys)) - for i, k := range keys { - v, ok := c.collections.Peek(k) - if !ok { - continue - } - ent := v.(*cachedCollection) - n := len(ent.collection.ManifestText) - size += int64(n) - entsize[i] = n - expired[i] = ent.expire.Before(now) - } - for i, k := range keys { - if expired[i] { - c.collections.Remove(k) - size -= int64(entsize[i]) - } - } - for i, k := range keys { - if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { - break - } - if expired[i] { - // already removed this entry in the previous loop - continue - } - c.collections.Remove(k) - size -= int64(entsize[i]) - } -} - // collectionBytes returns the approximate combined memory size of the // collection cache and session filesystem cache. func (c *cache) collectionBytes() uint64 { var size uint64 - for _, k := range c.collections.Keys() { - v, ok := c.collections.Peek(k) - if !ok { - continue - } - size += uint64(len(v.(*cachedCollection).collection.ManifestText)) - } for _, token := range c.sessions.Keys() { ent, ok := c.sessions.Peek(token) if !ok { @@ -454,49 +228,3 @@ func (c *cache) collectionBytes() uint64 { } return size } - -func (c *cache) lookupCollection(key string) *arvados.Collection { - e, cached := c.collections.Get(key) - if !cached { - return nil - } - ent := e.(*cachedCollection) - if ent.expire.Before(time.Now()) { - c.collections.Remove(key) - return nil - } - c.metrics.collectionHits.Inc() - return ent.collection -} - -func (c *cache) GetTokenUser(token string) (*arvados.User, error) { - // Get and cache user record associated with this - // token. We need to know their UUID for logging, and - // whether they are an admin or not for certain - // permission checks. - - // Get/create session entry - _, sess, err := c.GetSession(token) - if err != nil { - return nil, err - } - - // See if the user is already set, and if so, return it - user, _ := sess.user.Load().(*arvados.User) - if user != nil { - return user, nil - } - - // Fetch the user record - c.metrics.apiCalls.Inc() - var current arvados.User - - err = sess.client.RequestAndDecode(¤t, "GET", "arvados/v1/users/current", nil, nil) - if err != nil { - return nil, err - } - - // Stash the user record for next time - sess.user.Store(¤t) - return ¤t, nil -}