From d7aa8eaa8c07fd7bcf219468afa733e0c27ffd98 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 30 Jun 2023 10:28:38 -0400 Subject: [PATCH] 20559: Trim session count explicitly instead of using LRU cache. The LRU cache automatically implemented the session count limit by dropping the oldest session, which would break our "one session per token" rule (by evicting a session and creating a new one while the old session was still in use) when there were more active sessions than the configured limit. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keep-web/cache.go | 156 +++++++++++++++++++++++-------------- 1 file changed, 99 insertions(+), 57 deletions(-) diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index b519cbe843..604efd29d9 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -7,13 +7,13 @@ package keepweb import ( "errors" "net/http" + "sort" "sync" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -25,7 +25,7 @@ type cache struct { logger logrus.FieldLogger registry *prometheus.Registry metrics cacheMetrics - sessions *lru.TwoQueueCache + sessions map[string]*cachedSession setupOnce sync.Once mtx sync.Mutex @@ -108,9 +108,9 @@ type cachedSession struct { // to completely remove the session entry from the cache. mtx sync.RWMutex // refresh must be locked in order to read or write the - // fs/user/userLoaded fields. This mutex enables GetSession - // and pruneSessions to remove/replace fs and user values - // safely. + // fs/user/userLoaded/lastuse fields. This mutex enables + // GetSession and pruneSessions to remove/replace fs and user + // values safely. refresh sync.Mutex // inuse must be RLocked while the session is in use by a // caller. This mutex enables pruneSessions() to wait for all @@ -120,6 +120,7 @@ type cachedSession struct { fs arvados.CustomFileSystem user arvados.User userLoaded bool + lastuse time.Time } func (sess *cachedSession) Release() { @@ -133,7 +134,7 @@ func (sess *cachedSession) Release() { func (c *cache) setup() { var err error - c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions) + c.sessions = map[string]*cachedSession{} if err != nil { panic(err) } @@ -157,8 +158,9 @@ func (c *cache) setup() { } func (c *cache) updateGauges() { - c.metrics.collectionBytes.Set(float64(c.collectionBytes())) - c.metrics.sessionEntries.Set(float64(c.sessions.Len())) + n, size := c.sessionsSize() + c.metrics.collectionBytes.Set(float64(size)) + c.metrics.sessionEntries.Set(float64(n)) } var selectPDH = map[string]interface{}{ @@ -169,8 +171,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) { c.setupOnce.Do(c.setup) c.mtx.Lock() defer c.mtx.Unlock() - ent, _ := c.sessions.Get(token) - sess, _ := ent.(*cachedSession) + sess := c.sessions[token] if sess == nil { client, err := arvados.NewClientFromConfig(c.cluster) if err != nil { @@ -193,7 +194,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) { arvadosclient: arvadosclient, keepclient: keepclient.New(arvadosclient), } - c.sessions.Add(token, sess) + c.sessions[token] = sess } sess.mtx.RLock() return sess, nil @@ -212,6 +213,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi sess.refresh.Lock() defer sess.refresh.Unlock() now := time.Now() + sess.lastuse = now refresh := sess.expire.Before(now) if sess.fs == nil || !sess.userLoaded || refresh { // Wait for all active users to finish (otherwise they @@ -248,52 +250,95 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi return sess.fs, sess, &sess.user, nil } +type sessionSnapshot struct { + token string + sess *cachedSession + lastuse time.Time + fs arvados.CustomFileSystem + size int64 + prune bool +} + // Remove all expired idle session cache entries, and remove in-memory -// filesystems until approximate remaining size <= maxsize/2 +// filesystems until approximate remaining size <= maxsize func (c *cache) pruneSessions() { now := time.Now() - keys := c.sessions.Keys() - sizes := make([]int64, len(keys)) - prune := []string(nil) + c.mtx.Lock() + snaps := make([]sessionSnapshot, 0, len(c.sessions)) + for token, sess := range c.sessions { + snaps = append(snaps, sessionSnapshot{ + token: token, + sess: sess, + }) + } + c.mtx.Unlock() + + // Load lastuse/fs/expire data from sessions. Note we do this + // after unlocking c.mtx because sess.refresh.Lock sometimes + // waits for another goroutine to finish "[re]fetch user + // record". + for i := range snaps { + snaps[i].sess.refresh.Lock() + snaps[i].lastuse = snaps[i].sess.lastuse + snaps[i].fs = snaps[i].sess.fs + snaps[i].prune = snaps[i].sess.expire.Before(now) + snaps[i].sess.refresh.Unlock() + } + + // Sort sessions with oldest first. + sort.Slice(snaps, func(i, j int) bool { + return snaps[i].lastuse.Before(snaps[j].lastuse) + }) + + // Add up size of sessions that aren't already marked for + // pruning based on expire time. var size int64 - for i, token := range keys { - token := token.(string) - ent, ok := c.sessions.Peek(token) - if !ok { - continue + for i, snap := range snaps { + if !snap.prune && snap.fs != nil { + size := snap.fs.MemorySize() + snaps[i].size = size + size += size } - sess := ent.(*cachedSession) - sess.refresh.Lock() - expired := sess.expire.Before(now) - fs := sess.fs - sess.refresh.Unlock() - if expired { - prune = append(prune, token) + } + // Mark more sessions for deletion until reaching desired + // memory size limit, starting with the oldest entries. + for i, snap := range snaps { + if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes { + break } - if fs != nil { - sizes[i] = fs.MemorySize() - size += sizes[i] + if snap.prune { + continue + } + snaps[i].prune = true + size -= snap.size + } + + // Mark more sessions for deletion until reaching desired + // session count limit. + mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions + for i := range snaps { + if snaps[i].prune { + mustprune-- } } - // Remove tokens until reaching size limit, starting with the - // least frequently used entries (which Keys() returns last). - for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- { - if sizes[i] > 0 { - prune = append(prune, keys[i].(string)) - size -= sizes[i] + for i := range snaps { + if mustprune < 1 { + break + } else if !snaps[i].prune { + snaps[i].prune = true + mustprune-- } } c.mtx.Lock() defer c.mtx.Unlock() - for _, token := range prune { - ent, ok := c.sessions.Peek(token) - if !ok { + for _, snap := range snaps { + if !snap.prune { continue } - sess := ent.(*cachedSession) + sess := snap.sess if sess.mtx.TryLock() { - c.sessions.Remove(token) + delete(c.sessions, snap.token) continue } // We can't remove a session that's been checked out @@ -317,31 +362,28 @@ func (c *cache) pruneSessions() { defer sess.inuse.Unlock() // Release memory sess.fs = nil - if sess.expire.Before(now) { - // Mark user data as stale - sess.userLoaded = false - } // Next GetSession will make a new fs }() } } -// collectionBytes returns the approximate combined memory size of the -// collection cache and session filesystem cache. -func (c *cache) collectionBytes() uint64 { - var size uint64 - for _, token := range c.sessions.Keys() { - ent, ok := c.sessions.Peek(token) - if !ok { - continue - } - sess := ent.(*cachedSession) +// sessionsSize returns the number and approximate total memory size +// of all cached sessions. +func (c *cache) sessionsSize() (n int, size int64) { + c.mtx.Lock() + n = len(c.sessions) + sessions := make([]*cachedSession, 0, n) + for _, sess := range c.sessions { + sessions = append(sessions, sess) + } + c.mtx.Unlock() + for _, sess := range sessions { sess.refresh.Lock() fs := sess.fs sess.refresh.Unlock() if fs != nil { - size += uint64(fs.MemorySize()) + size += fs.MemorySize() } } - return size + return } -- 2.30.2