import (
"errors"
"net/http"
+ "sort"
"sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
- lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
logger logrus.FieldLogger
registry *prometheus.Registry
metrics cacheMetrics
- sessions *lru.TwoQueueCache
+ sessions map[string]*cachedSession
setupOnce sync.Once
mtx sync.Mutex
// to completely remove the session entry from the cache.
mtx sync.RWMutex
// refresh must be locked in order to read or write the
- // fs/user/userLoaded fields. This mutex enables GetSession
- // and pruneSessions to remove/replace fs and user values
- // safely.
+ // fs/user/userLoaded/lastuse fields. This mutex enables
+ // GetSession and pruneSessions to remove/replace fs and user
+ // values safely.
refresh sync.Mutex
// inuse must be RLocked while the session is in use by a
// caller. This mutex enables pruneSessions() to wait for all
fs arvados.CustomFileSystem
user arvados.User
userLoaded bool
+ lastuse time.Time
}
func (sess *cachedSession) Release() {
func (c *cache) setup() {
var err error
- c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
+ c.sessions = map[string]*cachedSession{}
if err != nil {
panic(err)
}
}
func (c *cache) updateGauges() {
- c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
- c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
+ n, size := c.sessionsSize()
+ c.metrics.collectionBytes.Set(float64(size))
+ c.metrics.sessionEntries.Set(float64(n))
}
var selectPDH = map[string]interface{}{
c.setupOnce.Do(c.setup)
c.mtx.Lock()
defer c.mtx.Unlock()
- ent, _ := c.sessions.Get(token)
- sess, _ := ent.(*cachedSession)
+ sess := c.sessions[token]
if sess == nil {
client, err := arvados.NewClientFromConfig(c.cluster)
if err != nil {
arvadosclient: arvadosclient,
keepclient: keepclient.New(arvadosclient),
}
- c.sessions.Add(token, sess)
+ c.sessions[token] = sess
}
sess.mtx.RLock()
return sess, nil
sess.refresh.Lock()
defer sess.refresh.Unlock()
now := time.Now()
+ sess.lastuse = now
refresh := sess.expire.Before(now)
if sess.fs == nil || !sess.userLoaded || refresh {
// Wait for all active users to finish (otherwise they
// using the new fs).
sess.inuse.Lock()
if !sess.userLoaded || refresh {
- err := sess.client.RequestAndDecode(&sess.user, "GET", "/arvados/v1/users/current", nil, nil)
+ err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil)
if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
// token is OK, but "get user id" api is out
// of scope -- use existing/expired info if
return sess.fs, sess, &sess.user, nil
}
+type sessionSnapshot struct {
+ token string
+ sess *cachedSession
+ lastuse time.Time
+ fs arvados.CustomFileSystem
+ size int64
+ prune bool
+}
+
// Remove all expired idle session cache entries, and remove in-memory
-// filesystems until approximate remaining size <= maxsize/2
+// filesystems until approximate remaining size <= maxsize
func (c *cache) pruneSessions() {
now := time.Now()
- keys := c.sessions.Keys()
- sizes := make([]int64, len(keys))
- prune := []string(nil)
+ c.mtx.Lock()
+ snaps := make([]sessionSnapshot, 0, len(c.sessions))
+ for token, sess := range c.sessions {
+ snaps = append(snaps, sessionSnapshot{
+ token: token,
+ sess: sess,
+ })
+ }
+ c.mtx.Unlock()
+
+ // Load lastuse/fs/expire data from sessions. Note we do this
+ // after unlocking c.mtx because sess.refresh.Lock sometimes
+ // waits for another goroutine to finish "[re]fetch user
+ // record".
+ for i := range snaps {
+ snaps[i].sess.refresh.Lock()
+ snaps[i].lastuse = snaps[i].sess.lastuse
+ snaps[i].fs = snaps[i].sess.fs
+ snaps[i].prune = snaps[i].sess.expire.Before(now)
+ snaps[i].sess.refresh.Unlock()
+ }
+
+ // Sort sessions with oldest first.
+ sort.Slice(snaps, func(i, j int) bool {
+ return snaps[i].lastuse.Before(snaps[j].lastuse)
+ })
+
+ // Add up size of sessions that aren't already marked for
+ // pruning based on expire time.
var size int64
- for i, token := range keys {
- token := token.(string)
- ent, ok := c.sessions.Peek(token)
- if !ok {
- continue
+ for i, snap := range snaps {
+ if !snap.prune && snap.fs != nil {
+ size := snap.fs.MemorySize()
+ snaps[i].size = size
+ size += size
}
- sess := ent.(*cachedSession)
- sess.refresh.Lock()
- expired := sess.expire.Before(now)
- fs := sess.fs
- sess.refresh.Unlock()
- if expired {
- prune = append(prune, token)
+ }
+ // Mark more sessions for deletion until reaching desired
+ // memory size limit, starting with the oldest entries.
+ for i, snap := range snaps {
+ if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) {
+ break
}
- if fs != nil {
- sizes[i] = fs.MemorySize()
- size += sizes[i]
+ if snap.prune {
+ continue
+ }
+ snaps[i].prune = true
+ size -= snap.size
+ }
+
+ // Mark more sessions for deletion until reaching desired
+ // session count limit.
+ mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
+ for i := range snaps {
+ if snaps[i].prune {
+ mustprune--
}
}
- // Remove tokens until reaching size limit, starting with the
- // least frequently used entries (which Keys() returns last).
- for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
- if sizes[i] > 0 {
- prune = append(prune, keys[i].(string))
- size -= sizes[i]
+ for i := range snaps {
+ if mustprune < 1 {
+ break
+ } else if !snaps[i].prune {
+ snaps[i].prune = true
+ mustprune--
}
}
c.mtx.Lock()
defer c.mtx.Unlock()
- for _, token := range prune {
- ent, ok := c.sessions.Peek(token)
- if !ok {
+ for _, snap := range snaps {
+ if !snap.prune {
continue
}
- sess := ent.(*cachedSession)
+ sess := snap.sess
if sess.mtx.TryLock() {
- c.sessions.Remove(token)
+ delete(c.sessions, snap.token)
continue
}
// We can't remove a session that's been checked out
defer sess.inuse.Unlock()
// Release memory
sess.fs = nil
- if sess.expire.Before(now) {
- // Mark user data as stale
- sess.userLoaded = false
- }
// Next GetSession will make a new fs
}()
}
}
-// collectionBytes returns the approximate combined memory size of the
-// collection cache and session filesystem cache.
-func (c *cache) collectionBytes() uint64 {
- var size uint64
- for _, token := range c.sessions.Keys() {
- ent, ok := c.sessions.Peek(token)
- if !ok {
- continue
- }
- sess := ent.(*cachedSession)
+// sessionsSize returns the number and approximate total memory size
+// of all cached sessions.
+func (c *cache) sessionsSize() (n int, size int64) {
+ c.mtx.Lock()
+ n = len(c.sessions)
+ sessions := make([]*cachedSession, 0, n)
+ for _, sess := range c.sessions {
+ sessions = append(sessions, sess)
+ }
+ c.mtx.Unlock()
+ for _, sess := range sessions {
sess.refresh.Lock()
fs := sess.fs
sess.refresh.Unlock()
if fs != nil {
- size += uint64(fs.MemorySize())
+ size += fs.MemorySize()
}
}
- return size
+ return
}