20559: Trim session count explicitly instead of using LRU cache. 20559-dav-concurrent-writes
authorTom Clegg <tom@curii.com>
Fri, 30 Jun 2023 14:28:38 +0000 (10:28 -0400)
committerTom Clegg <tom@curii.com>
Fri, 30 Jun 2023 14:28:38 +0000 (10:28 -0400)
The LRU cache automatically implemented the session count limit by
dropping the oldest session, which would break our "one session per
token" rule (by evicting a session and creating a new one while the
old session was still in use) when there were more active sessions
than the configured limit.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-web/cache.go

index b519cbe843b91cfe472c561e0671c480cdb57dfa..604efd29d9139e2fe4963709f64503a0d8d7e6f3 100644 (file)
@@ -7,13 +7,13 @@ package keepweb
 import (
        "errors"
        "net/http"
+       "sort"
        "sync"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
-       lru "github.com/hashicorp/golang-lru"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -25,7 +25,7 @@ type cache struct {
        logger    logrus.FieldLogger
        registry  *prometheus.Registry
        metrics   cacheMetrics
-       sessions  *lru.TwoQueueCache
+       sessions  map[string]*cachedSession
        setupOnce sync.Once
        mtx       sync.Mutex
 
@@ -108,9 +108,9 @@ type cachedSession struct {
        // to completely remove the session entry from the cache.
        mtx sync.RWMutex
        // refresh must be locked in order to read or write the
-       // fs/user/userLoaded fields. This mutex enables GetSession
-       // and pruneSessions to remove/replace fs and user values
-       // safely.
+       // fs/user/userLoaded/lastuse fields. This mutex enables
+       // GetSession and pruneSessions to remove/replace fs and user
+       // values safely.
        refresh sync.Mutex
        // inuse must be RLocked while the session is in use by a
        // caller. This mutex enables pruneSessions() to wait for all
@@ -120,6 +120,7 @@ type cachedSession struct {
        fs         arvados.CustomFileSystem
        user       arvados.User
        userLoaded bool
+       lastuse    time.Time
 }
 
 func (sess *cachedSession) Release() {
@@ -133,7 +134,7 @@ func (sess *cachedSession) Release() {
 
 func (c *cache) setup() {
        var err error
-       c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
+       c.sessions = map[string]*cachedSession{}
        if err != nil {
                panic(err)
        }
@@ -157,8 +158,9 @@ func (c *cache) setup() {
 }
 
 func (c *cache) updateGauges() {
-       c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
-       c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
+       n, size := c.sessionsSize()
+       c.metrics.collectionBytes.Set(float64(size))
+       c.metrics.sessionEntries.Set(float64(n))
 }
 
 var selectPDH = map[string]interface{}{
@@ -169,8 +171,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) {
        c.setupOnce.Do(c.setup)
        c.mtx.Lock()
        defer c.mtx.Unlock()
-       ent, _ := c.sessions.Get(token)
-       sess, _ := ent.(*cachedSession)
+       sess := c.sessions[token]
        if sess == nil {
                client, err := arvados.NewClientFromConfig(c.cluster)
                if err != nil {
@@ -193,7 +194,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) {
                        arvadosclient: arvadosclient,
                        keepclient:    keepclient.New(arvadosclient),
                }
-               c.sessions.Add(token, sess)
+               c.sessions[token] = sess
        }
        sess.mtx.RLock()
        return sess, nil
@@ -212,6 +213,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
        sess.refresh.Lock()
        defer sess.refresh.Unlock()
        now := time.Now()
+       sess.lastuse = now
        refresh := sess.expire.Before(now)
        if sess.fs == nil || !sess.userLoaded || refresh {
                // Wait for all active users to finish (otherwise they
@@ -248,52 +250,95 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
        return sess.fs, sess, &sess.user, nil
 }
 
+type sessionSnapshot struct {
+       token   string
+       sess    *cachedSession
+       lastuse time.Time
+       fs      arvados.CustomFileSystem
+       size    int64
+       prune   bool
+}
+
 // Remove all expired idle session cache entries, and remove in-memory
-// filesystems until approximate remaining size <= maxsize/2
+// filesystems until approximate remaining size <= maxsize
 func (c *cache) pruneSessions() {
        now := time.Now()
-       keys := c.sessions.Keys()
-       sizes := make([]int64, len(keys))
-       prune := []string(nil)
+       c.mtx.Lock()
+       snaps := make([]sessionSnapshot, 0, len(c.sessions))
+       for token, sess := range c.sessions {
+               snaps = append(snaps, sessionSnapshot{
+                       token: token,
+                       sess:  sess,
+               })
+       }
+       c.mtx.Unlock()
+
+       // Load lastuse/fs/expire data from sessions. Note we do this
+       // after unlocking c.mtx because sess.refresh.Lock sometimes
+       // waits for another goroutine to finish "[re]fetch user
+       // record".
+       for i := range snaps {
+               snaps[i].sess.refresh.Lock()
+               snaps[i].lastuse = snaps[i].sess.lastuse
+               snaps[i].fs = snaps[i].sess.fs
+               snaps[i].prune = snaps[i].sess.expire.Before(now)
+               snaps[i].sess.refresh.Unlock()
+       }
+
+       // Sort sessions with oldest first.
+       sort.Slice(snaps, func(i, j int) bool {
+               return snaps[i].lastuse.Before(snaps[j].lastuse)
+       })
+
+       // Add up size of sessions that aren't already marked for
+       // pruning based on expire time.
        var size int64
-       for i, token := range keys {
-               token := token.(string)
-               ent, ok := c.sessions.Peek(token)
-               if !ok {
-                       continue
+       for i, snap := range snaps {
+               if !snap.prune && snap.fs != nil {
+                       size := snap.fs.MemorySize()
+                       snaps[i].size = size
+                       size += size
                }
-               sess := ent.(*cachedSession)
-               sess.refresh.Lock()
-               expired := sess.expire.Before(now)
-               fs := sess.fs
-               sess.refresh.Unlock()
-               if expired {
-                       prune = append(prune, token)
+       }
+       // Mark more sessions for deletion until reaching desired
+       // memory size limit, starting with the oldest entries.
+       for i, snap := range snaps {
+               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes {
+                       break
                }
-               if fs != nil {
-                       sizes[i] = fs.MemorySize()
-                       size += sizes[i]
+               if snap.prune {
+                       continue
+               }
+               snaps[i].prune = true
+               size -= snap.size
+       }
+
+       // Mark more sessions for deletion until reaching desired
+       // session count limit.
+       mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
+       for i := range snaps {
+               if snaps[i].prune {
+                       mustprune--
                }
        }
-       // Remove tokens until reaching size limit, starting with the
-       // least frequently used entries (which Keys() returns last).
-       for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
-               if sizes[i] > 0 {
-                       prune = append(prune, keys[i].(string))
-                       size -= sizes[i]
+       for i := range snaps {
+               if mustprune < 1 {
+                       break
+               } else if !snaps[i].prune {
+                       snaps[i].prune = true
+                       mustprune--
                }
        }
 
        c.mtx.Lock()
        defer c.mtx.Unlock()
-       for _, token := range prune {
-               ent, ok := c.sessions.Peek(token)
-               if !ok {
+       for _, snap := range snaps {
+               if !snap.prune {
                        continue
                }
-               sess := ent.(*cachedSession)
+               sess := snap.sess
                if sess.mtx.TryLock() {
-                       c.sessions.Remove(token)
+                       delete(c.sessions, snap.token)
                        continue
                }
                // We can't remove a session that's been checked out
@@ -317,31 +362,28 @@ func (c *cache) pruneSessions() {
                        defer sess.inuse.Unlock()
                        // Release memory
                        sess.fs = nil
-                       if sess.expire.Before(now) {
-                               // Mark user data as stale
-                               sess.userLoaded = false
-                       }
                        // Next GetSession will make a new fs
                }()
        }
 }
 
-// collectionBytes returns the approximate combined memory size of the
-// collection cache and session filesystem cache.
-func (c *cache) collectionBytes() uint64 {
-       var size uint64
-       for _, token := range c.sessions.Keys() {
-               ent, ok := c.sessions.Peek(token)
-               if !ok {
-                       continue
-               }
-               sess := ent.(*cachedSession)
+// sessionsSize returns the number and approximate total memory size
+// of all cached sessions.
+func (c *cache) sessionsSize() (n int, size int64) {
+       c.mtx.Lock()
+       n = len(c.sessions)
+       sessions := make([]*cachedSession, 0, n)
+       for _, sess := range c.sessions {
+               sessions = append(sessions, sess)
+       }
+       c.mtx.Unlock()
+       for _, sess := range sessions {
                sess.refresh.Lock()
                fs := sess.fs
                sess.refresh.Unlock()
                if fs != nil {
-                       size += uint64(fs.MemorySize())
+                       size += fs.MemorySize()
                }
        }
-       return size
+       return
 }