X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c2aceca339ec3a6f3d853865cebd0efe348ff518..HEAD:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index a52af80484..b5b6cc4fa5 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -2,97 +2,52 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepweb import ( + "errors" + "net/http" + "sort" "sync" - "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) const metricsUpdateInterval = time.Second / 10 type cache struct { - cluster *arvados.Cluster - config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead - registry *prometheus.Registry - metrics cacheMetrics - pdhs *lru.TwoQueueCache - collections *lru.TwoQueueCache - permissions *lru.TwoQueueCache - sessions *lru.TwoQueueCache - setupOnce sync.Once + cluster *arvados.Cluster + logger logrus.FieldLogger + registry *prometheus.Registry + metrics cacheMetrics + sessions map[string]*cachedSession + setupOnce sync.Once + mtx sync.Mutex + + chPruneSessions chan struct{} } type cacheMetrics struct { - requests prometheus.Counter - collectionBytes prometheus.Gauge - collectionEntries prometheus.Gauge - sessionEntries prometheus.Gauge - collectionHits prometheus.Counter - pdhHits prometheus.Counter - permissionHits prometheus.Counter - sessionHits prometheus.Counter - sessionMisses prometheus.Counter - apiCalls prometheus.Counter + requests prometheus.Counter + collectionBytes prometheus.Gauge + sessionEntries prometheus.Gauge + sessionHits prometheus.Counter + sessionMisses prometheus.Counter } func (m *cacheMetrics) setup(reg *prometheus.Registry) { - m.requests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "requests", - Help: "Number of targetID-to-manifest lookups handled.", - }) - reg.MustRegister(m.requests) - m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "hits", - Help: "Number of pdh-to-manifest cache hits.", - }) - reg.MustRegister(m.collectionHits) - m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "pdh_hits", - Help: "Number of uuid-to-pdh cache hits.", - }) - reg.MustRegister(m.pdhHits) - m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "permission_hits", - Help: "Number of targetID-to-permission cache hits.", - }) - reg.MustRegister(m.permissionHits) - m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "api_calls", - Help: "Number of outgoing API calls made by cache.", - }) - reg.MustRegister(m.apiCalls) m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepweb_sessions", - Name: "cached_collection_bytes", - Help: "Total size of all cached manifests and sessions.", + Name: "cached_session_bytes", + Help: "Total size of all cached sessions.", }) reg.MustRegister(m.collectionBytes) - m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifests", - Help: "Number of manifests in cache.", - }) - reg.MustRegister(m.collectionEntries) m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepweb_sessions", @@ -116,44 +71,70 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { reg.MustRegister(m.sessionMisses) } -type cachedPDH struct { - expire time.Time - pdh string -} - -type cachedCollection struct { - expire time.Time - collection *arvados.Collection -} - -type cachedPermission struct { - expire time.Time -} - type cachedSession struct { + cache *cache expire time.Time - fs atomic.Value client *arvados.Client arvadosclient *arvadosclient.ArvadosClient keepclient *keepclient.KeepClient - user atomic.Value + + // Each session uses a system of three mutexes (plus the + // cache-wide mutex) to enable the following semantics: + // + // - There are never multiple sessions in use for a given + // token. + // + // - If the cached in-memory filesystems/user records are + // older than the configured cache TTL when a request starts, + // the request will use new ones. + // + // - Unused sessions are garbage-collected. + // + // In particular, when it is necessary to reset a session's + // filesystem/user record (to save memory or respect the + // configured cache TTL), any operations that are already + // using the existing filesystem/user record are allowed to + // finish before the new filesystem is constructed. + // + // The locks must be acquired in the following order: + // cache.mtx, session.mtx, session.refresh, session.inuse. + + // mtx is RLocked while session is not safe to evict from + // cache -- i.e., a checkout() has decided to use it, and its + // caller is not finished with it. When locking or rlocking + // this mtx, the cache mtx MUST already be held. + // + // This mutex enables pruneSessions to detect when it is safe + // to completely remove the session entry from the cache. + mtx sync.RWMutex + // refresh must be locked in order to read or write the + // fs/user/userLoaded/lastuse fields. This mutex enables + // GetSession and pruneSessions to remove/replace fs and user + // values safely. + refresh sync.Mutex + // inuse must be RLocked while the session is in use by a + // caller. This mutex enables pruneSessions() to wait for all + // existing usage to finish by calling inuse.Lock(). + inuse sync.RWMutex + + fs arvados.CustomFileSystem + user arvados.User + userLoaded bool + lastuse time.Time +} + +func (sess *cachedSession) Release() { + sess.inuse.RUnlock() + sess.mtx.RUnlock() + select { + case sess.cache.chPruneSessions <- struct{}{}: + default: + } } func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries) - if err != nil { - panic(err) - } - c.collections, err = lru.New2Q(c.config.MaxCollectionEntries) - if err != nil { - panic(err) - } - c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries) - if err != nil { - panic(err) - } - c.sessions, err = lru.New2Q(c.config.MaxSessions) + c.sessions = map[string]*cachedSession{} if err != nil { panic(err) } @@ -168,336 +149,244 @@ func (c *cache) setup() { c.updateGauges() } }() + c.chPruneSessions = make(chan struct{}, 1) + go func() { + for range c.chPruneSessions { + c.pruneSessions() + } + }() } func (c *cache) updateGauges() { - c.metrics.collectionBytes.Set(float64(c.collectionBytes())) - c.metrics.collectionEntries.Set(float64(c.collections.Len())) - c.metrics.sessionEntries.Set(float64(c.sessions.Len())) + n, size := c.sessionsSize() + c.metrics.collectionBytes.Set(float64(size)) + c.metrics.sessionEntries.Set(float64(n)) } var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } -// Update saves a modified version (fs) to an existing collection -// (coll) and, if successful, updates the relevant cache entries so -// subsequent calls to Get() reflect the modifications. -func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { - c.setupOnce.Do(c.setup) - - m, err := fs.MarshalManifest(".") - if err != nil || m == coll.ManifestText { - return err - } - coll.ManifestText = m - var updated arvados.Collection - defer c.pdhs.Remove(coll.UUID) - err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ - "collection": map[string]string{ - "manifest_text": coll.ManifestText, - }, - }) - if err == nil { - c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{ - expire: time.Now().Add(time.Duration(c.config.TTL)), - collection: &updated, - }) - } - return err -} - -// ResetSession unloads any potentially stale state. Should be called -// after write operations, so subsequent reads don't return stale -// data. -func (c *cache) ResetSession(token string) { - c.setupOnce.Do(c.setup) - c.sessions.Remove(token) -} - -// Get a long-lived CustomFileSystem suitable for doing a read operation -// with the given token. -func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) { +func (c *cache) checkout(token string) (*cachedSession, error) { c.setupOnce.Do(c.setup) - now := time.Now() - ent, _ := c.sessions.Get(token) - sess, _ := ent.(*cachedSession) - expired := false + c.mtx.Lock() + defer c.mtx.Unlock() + sess := c.sessions[token] if sess == nil { - c.metrics.sessionMisses.Inc() - sess = &cachedSession{ - expire: now.Add(c.config.TTL.Duration()), - } - var err error - sess.client, err = arvados.NewClientFromConfig(c.cluster) + client, err := arvados.NewClientFromConfig(c.cluster) if err != nil { - return nil, nil, err + return nil, err } - sess.client.AuthToken = token - sess.arvadosclient, err = arvadosclient.New(sess.client) + client.AuthToken = token + client.Timeout = time.Minute + client.Logger = c.logger + // A non-empty origin header tells controller to + // prioritize our traffic as interactive, which is + // true most of the time. + origin := c.cluster.Services.WebDAVDownload.ExternalURL + client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}} + arvadosclient, err := arvadosclient.New(client) if err != nil { - return nil, nil, err + return nil, err } - sess.keepclient = keepclient.New(sess.arvadosclient) - c.sessions.Add(token, sess) - } else if sess.expire.Before(now) { - c.metrics.sessionMisses.Inc() - expired = true - } else { - c.metrics.sessionHits.Inc() - } - go c.pruneSessions() - fs, _ := sess.fs.Load().(arvados.CustomFileSystem) - if fs != nil && !expired { - return fs, sess, nil + kc := keepclient.New(arvadosclient) + kc.DiskCacheSize = c.cluster.Collections.WebDAVCache.DiskCacheSize + sess = &cachedSession{ + cache: c, + client: client, + arvadosclient: arvadosclient, + keepclient: kc, + } + c.sessions[token] = sess } - fs = sess.client.SiteFileSystem(sess.keepclient) - fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) - sess.fs.Store(fs) - return fs, sess, nil + sess.mtx.RLock() + return sess, nil } -// Remove all expired session cache entries, then remove more entries -// until approximate remaining size <= maxsize/2 -func (c *cache) pruneSessions() { - now := time.Now() - var size int64 - keys := c.sessions.Keys() - for _, token := range keys { - ent, ok := c.sessions.Peek(token) - if !ok { - continue - } - s := ent.(*cachedSession) - if s.expire.Before(now) { - c.sessions.Remove(token) - continue - } - if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok { - size += fs.MemorySize() - } +// Get a long-lived CustomFileSystem suitable for doing a read or +// write operation with the given token. +// +// If the returned error is nil, the caller must call Release() on the +// returned session when finished using it. +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) { + sess, err := c.checkout(token) + if err != nil { + return nil, nil, nil, err } - // Remove tokens until reaching size limit, starting with the - // least frequently used entries (which Keys() returns last). - for i := len(keys) - 1; i >= 0; i-- { - token := keys[i] - if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { - break - } - ent, ok := c.sessions.Peek(token) - if !ok { - continue - } - s := ent.(*cachedSession) - fs, _ := s.fs.Load().(arvados.CustomFileSystem) - if fs == nil { - continue + sess.refresh.Lock() + defer sess.refresh.Unlock() + now := time.Now() + sess.lastuse = now + refresh := sess.expire.Before(now) + if sess.fs == nil || !sess.userLoaded || refresh { + // Wait for all active users to finish (otherwise they + // might make changes to an old fs after we start + // using the new fs). + sess.inuse.Lock() + if !sess.userLoaded || refresh { + err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil) + if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden { + // token is OK, but "get user id" api is out + // of scope -- use existing/expired info if + // any, or leave empty for unknown user + } else if err != nil { + sess.inuse.Unlock() + sess.mtx.RUnlock() + return nil, nil, nil, err + } + sess.userLoaded = true } - c.sessions.Remove(token) - size -= fs.MemorySize() - } -} -func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { - c.setupOnce.Do(c.setup) - c.metrics.requests.Inc() - - permOK := false - permKey := arv.ApiToken + "\000" + targetID - if forceReload { - } else if ent, cached := c.permissions.Get(permKey); cached { - ent := ent.(*cachedPermission) - if ent.expire.Before(time.Now()) { - c.permissions.Remove(permKey) + if sess.fs == nil || refresh { + sess.fs = sess.client.SiteFileSystem(sess.keepclient) + sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()) + c.metrics.sessionMisses.Inc() } else { - permOK = true - c.metrics.permissionHits.Inc() + c.metrics.sessionHits.Inc() } + sess.inuse.Unlock() + } else { + c.metrics.sessionHits.Inc() } + sess.inuse.RLock() + return sess.fs, sess, &sess.user, nil +} - var pdh string - if arvadosclient.PDHMatch(targetID) { - pdh = targetID - } else if ent, cached := c.pdhs.Get(targetID); cached { - ent := ent.(*cachedPDH) - if ent.expire.Before(time.Now()) { - c.pdhs.Remove(targetID) - } else { - pdh = ent.pdh - c.metrics.pdhHits.Inc() - } - } +type sessionSnapshot struct { + token string + sess *cachedSession + lastuse time.Time + fs arvados.CustomFileSystem + size int64 + prune bool +} - var collection *arvados.Collection - if pdh != "" { - collection = c.lookupCollection(arv.ApiToken + "\000" + pdh) +// Remove all expired idle session cache entries, and remove in-memory +// filesystems until approximate remaining size <= maxsize +func (c *cache) pruneSessions() { + now := time.Now() + c.mtx.Lock() + snaps := make([]sessionSnapshot, 0, len(c.sessions)) + for token, sess := range c.sessions { + snaps = append(snaps, sessionSnapshot{ + token: token, + sess: sess, + }) } - - if collection != nil && permOK { - return collection, nil - } else if collection != nil { - // Ask API for current PDH for this targetID. Most - // likely, the cached PDH is still correct; if so, - // _and_ the current token has permission, we can - // use our cached manifest. - c.metrics.apiCalls.Inc() - var current arvados.Collection - err := arv.Get("collections", targetID, selectPDH, ¤t) - if err != nil { - return nil, err - } - if current.PortableDataHash == pdh { - c.permissions.Add(permKey, &cachedPermission{ - expire: time.Now().Add(time.Duration(c.config.TTL)), - }) - if pdh != targetID { - c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.config.UUIDTTL)), - pdh: pdh, - }) - } - return collection, err - } - // PDH changed, but now we know we have - // permission -- and maybe we already have the - // new PDH in the cache. - if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil { - return coll, nil - } + c.mtx.Unlock() + + // Load lastuse/fs/expire data from sessions. Note we do this + // after unlocking c.mtx because sess.refresh.Lock sometimes + // waits for another goroutine to finish "[re]fetch user + // record". + for i := range snaps { + snaps[i].sess.refresh.Lock() + snaps[i].lastuse = snaps[i].sess.lastuse + snaps[i].fs = snaps[i].sess.fs + snaps[i].prune = snaps[i].sess.expire.Before(now) + snaps[i].sess.refresh.Unlock() } - // Collection manifest is not cached. - c.metrics.apiCalls.Inc() - err := arv.Get("collections", targetID, nil, &collection) - if err != nil { - return nil, err - } - exp := time.Now().Add(time.Duration(c.config.TTL)) - c.permissions.Add(permKey, &cachedPermission{ - expire: exp, - }) - c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.config.UUIDTTL)), - pdh: collection.PortableDataHash, + // Sort sessions with oldest first. + sort.Slice(snaps, func(i, j int) bool { + return snaps[i].lastuse.Before(snaps[j].lastuse) }) - c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{ - expire: exp, - collection: collection, - }) - if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) { - go c.pruneCollections() - } - return collection, nil -} -// pruneCollections checks the total bytes occupied by manifest_text -// in the collection cache and removes old entries as needed to bring -// the total size down to CollectionBytes. It also deletes all expired -// entries. -// -// pruneCollections does not aim to be perfectly correct when there is -// concurrent cache activity. -func (c *cache) pruneCollections() { + // Add up size of sessions that aren't already marked for + // pruning based on expire time. var size int64 - now := time.Now() - keys := c.collections.Keys() - entsize := make([]int, len(keys)) - expired := make([]bool, len(keys)) - for i, k := range keys { - v, ok := c.collections.Peek(k) - if !ok { - continue + for i, snap := range snaps { + if !snap.prune && snap.fs != nil { + size := snap.fs.MemorySize() + snaps[i].size = size + size += size } - ent := v.(*cachedCollection) - n := len(ent.collection.ManifestText) - size += int64(n) - entsize[i] = n - expired[i] = ent.expire.Before(now) } - for i, k := range keys { - if expired[i] { - c.collections.Remove(k) - size -= int64(entsize[i]) - } - } - for i, k := range keys { - if size <= c.config.MaxCollectionBytes/2 { + // Mark more sessions for deletion until reaching desired + // memory size limit, starting with the oldest entries. + for i, snap := range snaps { + if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) { break } - if expired[i] { - // already removed this entry in the previous loop + if snap.prune { continue } - c.collections.Remove(k) - size -= int64(entsize[i]) + snaps[i].prune = true + size -= snap.size } -} -// collectionBytes returns the approximate combined memory size of the -// collection cache and session filesystem cache. -func (c *cache) collectionBytes() uint64 { - var size uint64 - for _, k := range c.collections.Keys() { - v, ok := c.collections.Peek(k) - if !ok { - continue + // Mark more sessions for deletion until reaching desired + // session count limit. + mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions + for i := range snaps { + if snaps[i].prune { + mustprune-- } - size += uint64(len(v.(*cachedCollection).collection.ManifestText)) } - for _, token := range c.sessions.Keys() { - ent, ok := c.sessions.Peek(token) - if !ok { - continue - } - if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok { - size += uint64(fs.MemorySize()) + for i := range snaps { + if mustprune < 1 { + break + } else if !snaps[i].prune { + snaps[i].prune = true + mustprune-- } } - return size -} -func (c *cache) lookupCollection(key string) *arvados.Collection { - e, cached := c.collections.Get(key) - if !cached { - return nil - } - ent := e.(*cachedCollection) - if ent.expire.Before(time.Now()) { - c.collections.Remove(key) - return nil + c.mtx.Lock() + defer c.mtx.Unlock() + for _, snap := range snaps { + if !snap.prune { + continue + } + sess := snap.sess + if sess.mtx.TryLock() { + delete(c.sessions, snap.token) + continue + } + // We can't remove a session that's been checked out + // -- that would allow another session to be created + // for the same token using a different in-memory + // filesystem. Instead, we wait for active requests to + // finish and then "unload" it. After this, either the + // next GetSession will reload fs/user, or a + // subsequent pruneSessions will remove the session. + go func() { + // Ensure nobody is mid-GetSession() (note we + // already know nobody is mid-checkout() + // because we have c.mtx locked) + sess.refresh.Lock() + defer sess.refresh.Unlock() + // Wait for current usage to finish (i.e., + // anyone who has decided to use the current + // values of sess.fs and sess.user, and hasn't + // called Release() yet) + sess.inuse.Lock() + defer sess.inuse.Unlock() + // Release memory + sess.fs = nil + // Next GetSession will make a new fs + }() } - c.metrics.collectionHits.Inc() - return ent.collection } -func (c *cache) GetTokenUser(token string) (*arvados.User, error) { - // Get and cache user record associated with this - // token. We need to know their UUID for logging, and - // whether they are an admin or not for certain - // permission checks. - - // Get/create session entry - _, sess, err := c.GetSession(token) - if err != nil { - return nil, err +// sessionsSize returns the number and approximate total memory size +// of all cached sessions. +func (c *cache) sessionsSize() (n int, size int64) { + c.mtx.Lock() + n = len(c.sessions) + sessions := make([]*cachedSession, 0, n) + for _, sess := range c.sessions { + sessions = append(sessions, sess) } - - // See if the user is already set, and if so, return it - user, _ := sess.user.Load().(*arvados.User) - if user != nil { - return user, nil - } - - // Fetch the user record - c.metrics.apiCalls.Inc() - var current arvados.User - - err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil) - if err != nil { - return nil, err + c.mtx.Unlock() + for _, sess := range sessions { + sess.refresh.Lock() + fs := sess.fs + sess.refresh.Unlock() + if fs != nil { + size += fs.MemorySize() + } } - - // Stash the user record for next time - sess.user.Store(¤t) - return ¤t, nil + return }