X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8772ee6a857973d1cac026b2185cd1e3d56a292d..d506da189ae39bbf86f53e7cd9cda0db45a54695:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index b519cbe843..d443bc0829 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -7,13 +7,13 @@ package keepweb import ( "errors" "net/http" + "sort" "sync" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -25,7 +25,7 @@ type cache struct { logger logrus.FieldLogger registry *prometheus.Registry metrics cacheMetrics - sessions *lru.TwoQueueCache + sessions map[string]*cachedSession setupOnce sync.Once mtx sync.Mutex @@ -108,9 +108,9 @@ type cachedSession struct { // to completely remove the session entry from the cache. mtx sync.RWMutex // refresh must be locked in order to read or write the - // fs/user/userLoaded fields. This mutex enables GetSession - // and pruneSessions to remove/replace fs and user values - // safely. + // fs/user/userLoaded/lastuse fields. This mutex enables + // GetSession and pruneSessions to remove/replace fs and user + // values safely. refresh sync.Mutex // inuse must be RLocked while the session is in use by a // caller. This mutex enables pruneSessions() to wait for all @@ -120,6 +120,7 @@ type cachedSession struct { fs arvados.CustomFileSystem user arvados.User userLoaded bool + lastuse time.Time } func (sess *cachedSession) Release() { @@ -133,7 +134,7 @@ func (sess *cachedSession) Release() { func (c *cache) setup() { var err error - c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions) + c.sessions = map[string]*cachedSession{} if err != nil { panic(err) } @@ -157,8 +158,9 @@ func (c *cache) setup() { } func (c *cache) updateGauges() { - c.metrics.collectionBytes.Set(float64(c.collectionBytes())) - c.metrics.sessionEntries.Set(float64(c.sessions.Len())) + n, size := c.sessionsSize() + c.metrics.collectionBytes.Set(float64(size)) + c.metrics.sessionEntries.Set(float64(n)) } var selectPDH = map[string]interface{}{ @@ -169,8 +171,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) { c.setupOnce.Do(c.setup) c.mtx.Lock() defer c.mtx.Unlock() - ent, _ := c.sessions.Get(token) - sess, _ := ent.(*cachedSession) + sess := c.sessions[token] if sess == nil { client, err := arvados.NewClientFromConfig(c.cluster) if err != nil { @@ -193,7 +194,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) { arvadosclient: arvadosclient, keepclient: keepclient.New(arvadosclient), } - c.sessions.Add(token, sess) + c.sessions[token] = sess } sess.mtx.RLock() return sess, nil @@ -212,6 +213,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi sess.refresh.Lock() defer sess.refresh.Unlock() now := time.Now() + sess.lastuse = now refresh := sess.expire.Before(now) if sess.fs == nil || !sess.userLoaded || refresh { // Wait for all active users to finish (otherwise they @@ -219,7 +221,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi // using the new fs). sess.inuse.Lock() if !sess.userLoaded || refresh { - err := sess.client.RequestAndDecode(&sess.user, "GET", "/arvados/v1/users/current", nil, nil) + err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil) if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden { // token is OK, but "get user id" api is out // of scope -- use existing/expired info if @@ -248,52 +250,95 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi return sess.fs, sess, &sess.user, nil } +type sessionSnapshot struct { + token string + sess *cachedSession + lastuse time.Time + fs arvados.CustomFileSystem + size int64 + prune bool +} + // Remove all expired idle session cache entries, and remove in-memory -// filesystems until approximate remaining size <= maxsize/2 +// filesystems until approximate remaining size <= maxsize func (c *cache) pruneSessions() { now := time.Now() - keys := c.sessions.Keys() - sizes := make([]int64, len(keys)) - prune := []string(nil) + c.mtx.Lock() + snaps := make([]sessionSnapshot, 0, len(c.sessions)) + for token, sess := range c.sessions { + snaps = append(snaps, sessionSnapshot{ + token: token, + sess: sess, + }) + } + c.mtx.Unlock() + + // Load lastuse/fs/expire data from sessions. Note we do this + // after unlocking c.mtx because sess.refresh.Lock sometimes + // waits for another goroutine to finish "[re]fetch user + // record". + for i := range snaps { + snaps[i].sess.refresh.Lock() + snaps[i].lastuse = snaps[i].sess.lastuse + snaps[i].fs = snaps[i].sess.fs + snaps[i].prune = snaps[i].sess.expire.Before(now) + snaps[i].sess.refresh.Unlock() + } + + // Sort sessions with oldest first. + sort.Slice(snaps, func(i, j int) bool { + return snaps[i].lastuse.Before(snaps[j].lastuse) + }) + + // Add up size of sessions that aren't already marked for + // pruning based on expire time. var size int64 - for i, token := range keys { - token := token.(string) - ent, ok := c.sessions.Peek(token) - if !ok { - continue + for i, snap := range snaps { + if !snap.prune && snap.fs != nil { + size := snap.fs.MemorySize() + snaps[i].size = size + size += size } - sess := ent.(*cachedSession) - sess.refresh.Lock() - expired := sess.expire.Before(now) - fs := sess.fs - sess.refresh.Unlock() - if expired { - prune = append(prune, token) + } + // Mark more sessions for deletion until reaching desired + // memory size limit, starting with the oldest entries. + for i, snap := range snaps { + if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) { + break } - if fs != nil { - sizes[i] = fs.MemorySize() - size += sizes[i] + if snap.prune { + continue + } + snaps[i].prune = true + size -= snap.size + } + + // Mark more sessions for deletion until reaching desired + // session count limit. + mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions + for i := range snaps { + if snaps[i].prune { + mustprune-- } } - // Remove tokens until reaching size limit, starting with the - // least frequently used entries (which Keys() returns last). - for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- { - if sizes[i] > 0 { - prune = append(prune, keys[i].(string)) - size -= sizes[i] + for i := range snaps { + if mustprune < 1 { + break + } else if !snaps[i].prune { + snaps[i].prune = true + mustprune-- } } c.mtx.Lock() defer c.mtx.Unlock() - for _, token := range prune { - ent, ok := c.sessions.Peek(token) - if !ok { + for _, snap := range snaps { + if !snap.prune { continue } - sess := ent.(*cachedSession) + sess := snap.sess if sess.mtx.TryLock() { - c.sessions.Remove(token) + delete(c.sessions, snap.token) continue } // We can't remove a session that's been checked out @@ -317,31 +362,28 @@ func (c *cache) pruneSessions() { defer sess.inuse.Unlock() // Release memory sess.fs = nil - if sess.expire.Before(now) { - // Mark user data as stale - sess.userLoaded = false - } // Next GetSession will make a new fs }() } } -// collectionBytes returns the approximate combined memory size of the -// collection cache and session filesystem cache. -func (c *cache) collectionBytes() uint64 { - var size uint64 - for _, token := range c.sessions.Keys() { - ent, ok := c.sessions.Peek(token) - if !ok { - continue - } - sess := ent.(*cachedSession) +// sessionsSize returns the number and approximate total memory size +// of all cached sessions. +func (c *cache) sessionsSize() (n int, size int64) { + c.mtx.Lock() + n = len(c.sessions) + sessions := make([]*cachedSession, 0, n) + for _, sess := range c.sessions { + sessions = append(sessions, sess) + } + c.mtx.Unlock() + for _, sess := range sessions { sess.refresh.Lock() fs := sess.fs sess.refresh.Unlock() if fs != nil { - size += uint64(fs.MemorySize()) + size += fs.MemorySize() } } - return size + return }