Merge branch '21666-provision-test-improvement'
[arvados.git] / services / keep-web / cache.go
index d5fdc4997ecee3c67d1cc8ea4003c2c1e3f6cac4..b5b6cc4fa508cf25b8a388b490afef23dd12c239 100644 (file)
@@ -5,14 +5,15 @@
 package keepweb
 
 import (
+       "errors"
+       "net/http"
+       "sort"
        "sync"
-       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
-       lru "github.com/hashicorp/golang-lru"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -20,74 +21,33 @@ import (
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       cluster     *arvados.Cluster
-       logger      logrus.FieldLogger
-       registry    *prometheus.Registry
-       metrics     cacheMetrics
-       pdhs        *lru.TwoQueueCache
-       collections *lru.TwoQueueCache
-       sessions    *lru.TwoQueueCache
-       setupOnce   sync.Once
-
-       chPruneSessions    chan struct{}
-       chPruneCollections chan struct{}
+       cluster   *arvados.Cluster
+       logger    logrus.FieldLogger
+       registry  *prometheus.Registry
+       metrics   cacheMetrics
+       sessions  map[string]*cachedSession
+       setupOnce sync.Once
+       mtx       sync.Mutex
+
+       chPruneSessions chan struct{}
 }
 
 type cacheMetrics struct {
-       requests          prometheus.Counter
-       collectionBytes   prometheus.Gauge
-       collectionEntries prometheus.Gauge
-       sessionEntries    prometheus.Gauge
-       collectionHits    prometheus.Counter
-       pdhHits           prometheus.Counter
-       sessionHits       prometheus.Counter
-       sessionMisses     prometheus.Counter
-       apiCalls          prometheus.Counter
+       requests        prometheus.Counter
+       collectionBytes prometheus.Gauge
+       sessionEntries  prometheus.Gauge
+       sessionHits     prometheus.Counter
+       sessionMisses   prometheus.Counter
 }
 
 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
-       m.requests = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "requests",
-               Help:      "Number of targetID-to-manifest lookups handled.",
-       })
-       reg.MustRegister(m.requests)
-       m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "hits",
-               Help:      "Number of pdh-to-manifest cache hits.",
-       })
-       reg.MustRegister(m.collectionHits)
-       m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "pdh_hits",
-               Help:      "Number of uuid-to-pdh cache hits.",
-       })
-       reg.MustRegister(m.pdhHits)
-       m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "api_calls",
-               Help:      "Number of outgoing API calls made by cache.",
-       })
-       reg.MustRegister(m.apiCalls)
        m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
-               Name:      "cached_collection_bytes",
-               Help:      "Total size of all cached manifests and sessions.",
+               Name:      "cached_session_bytes",
+               Help:      "Total size of all cached sessions.",
        })
        reg.MustRegister(m.collectionBytes)
-       m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "cached_manifests",
-               Help:      "Number of manifests in cache.",
-       })
-       reg.MustRegister(m.collectionEntries)
        m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
@@ -111,41 +71,70 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
        reg.MustRegister(m.sessionMisses)
 }
 
-type cachedPDH struct {
-       expire  time.Time
-       refresh time.Time
-       pdh     string
-}
-
-type cachedCollection struct {
-       expire     time.Time
-       collection *arvados.Collection
-}
-
-type cachedPermission struct {
-       expire time.Time
-}
-
 type cachedSession struct {
+       cache         *cache
        expire        time.Time
-       fs            atomic.Value
        client        *arvados.Client
        arvadosclient *arvadosclient.ArvadosClient
        keepclient    *keepclient.KeepClient
-       user          atomic.Value
+
+       // Each session uses a system of three mutexes (plus the
+       // cache-wide mutex) to enable the following semantics:
+       //
+       // - There are never multiple sessions in use for a given
+       // token.
+       //
+       // - If the cached in-memory filesystems/user records are
+       // older than the configured cache TTL when a request starts,
+       // the request will use new ones.
+       //
+       // - Unused sessions are garbage-collected.
+       //
+       // In particular, when it is necessary to reset a session's
+       // filesystem/user record (to save memory or respect the
+       // configured cache TTL), any operations that are already
+       // using the existing filesystem/user record are allowed to
+       // finish before the new filesystem is constructed.
+       //
+       // The locks must be acquired in the following order:
+       // cache.mtx, session.mtx, session.refresh, session.inuse.
+
+       // mtx is RLocked while session is not safe to evict from
+       // cache -- i.e., a checkout() has decided to use it, and its
+       // caller is not finished with it. When locking or rlocking
+       // this mtx, the cache mtx MUST already be held.
+       //
+       // This mutex enables pruneSessions to detect when it is safe
+       // to completely remove the session entry from the cache.
+       mtx sync.RWMutex
+       // refresh must be locked in order to read or write the
+       // fs/user/userLoaded/lastuse fields. This mutex enables
+       // GetSession and pruneSessions to remove/replace fs and user
+       // values safely.
+       refresh sync.Mutex
+       // inuse must be RLocked while the session is in use by a
+       // caller. This mutex enables pruneSessions() to wait for all
+       // existing usage to finish by calling inuse.Lock().
+       inuse sync.RWMutex
+
+       fs         arvados.CustomFileSystem
+       user       arvados.User
+       userLoaded bool
+       lastuse    time.Time
+}
+
+func (sess *cachedSession) Release() {
+       sess.inuse.RUnlock()
+       sess.mtx.RUnlock()
+       select {
+       case sess.cache.chPruneSessions <- struct{}{}:
+       default:
+       }
 }
 
 func (c *cache) setup() {
        var err error
-       c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
-       if err != nil {
-               panic(err)
-       }
-       c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
-       if err != nil {
-               panic(err)
-       }
-       c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
+       c.sessions = map[string]*cachedSession{}
        if err != nil {
                panic(err)
        }
@@ -160,12 +149,6 @@ func (c *cache) setup() {
                        c.updateGauges()
                }
        }()
-       c.chPruneCollections = make(chan struct{}, 1)
-       go func() {
-               for range c.chPruneCollections {
-                       c.pruneCollections()
-               }
-       }()
        c.chPruneSessions = make(chan struct{}, 1)
        go func() {
                for range c.chPruneSessions {
@@ -175,337 +158,235 @@ func (c *cache) setup() {
 }
 
 func (c *cache) updateGauges() {
-       c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
-       c.metrics.collectionEntries.Set(float64(c.collections.Len()))
-       c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
+       n, size := c.sessionsSize()
+       c.metrics.collectionBytes.Set(float64(size))
+       c.metrics.sessionEntries.Set(float64(n))
 }
 
 var selectPDH = map[string]interface{}{
        "select": []string{"portable_data_hash"},
 }
 
-// Update saves a modified version (fs) to an existing collection
-// (coll) and, if successful, updates the relevant cache entries so
-// subsequent calls to Get() reflect the modifications.
-func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
-       c.setupOnce.Do(c.setup)
-
-       m, err := fs.MarshalManifest(".")
-       if err != nil || m == coll.ManifestText {
-               return err
-       }
-       coll.ManifestText = m
-       var updated arvados.Collection
-       err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
-               "collection": map[string]string{
-                       "manifest_text": coll.ManifestText,
-               },
-       })
-       if err != nil {
-               c.pdhs.Remove(coll.UUID)
-               return err
-       }
-       c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
-               expire:     time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               collection: &updated,
-       })
-       c.pdhs.Add(coll.UUID, &cachedPDH{
-               expire:  time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-               pdh:     updated.PortableDataHash,
-       })
-       return nil
-}
-
-// ResetSession unloads any potentially stale state. Should be called
-// after write operations, so subsequent reads don't return stale
-// data.
-func (c *cache) ResetSession(token string) {
-       c.setupOnce.Do(c.setup)
-       c.sessions.Remove(token)
-}
-
-// Get a long-lived CustomFileSystem suitable for doing a read operation
-// with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+func (c *cache) checkout(token string) (*cachedSession, error) {
        c.setupOnce.Do(c.setup)
-       now := time.Now()
-       ent, _ := c.sessions.Get(token)
-       sess, _ := ent.(*cachedSession)
-       expired := false
+       c.mtx.Lock()
+       defer c.mtx.Unlock()
+       sess := c.sessions[token]
        if sess == nil {
-               c.metrics.sessionMisses.Inc()
-               sess = &cachedSession{
-                       expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
-               }
-               var err error
-               sess.client, err = arvados.NewClientFromConfig(c.cluster)
+               client, err := arvados.NewClientFromConfig(c.cluster)
                if err != nil {
-                       return nil, nil, err
+                       return nil, err
                }
-               sess.client.AuthToken = token
-               sess.arvadosclient, err = arvadosclient.New(sess.client)
+               client.AuthToken = token
+               client.Timeout = time.Minute
+               client.Logger = c.logger
+               // A non-empty origin header tells controller to
+               // prioritize our traffic as interactive, which is
+               // true most of the time.
+               origin := c.cluster.Services.WebDAVDownload.ExternalURL
+               client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
+               arvadosclient, err := arvadosclient.New(client)
                if err != nil {
-                       return nil, nil, err
+                       return nil, err
                }
-               sess.keepclient = keepclient.New(sess.arvadosclient)
-               c.sessions.Add(token, sess)
-       } else if sess.expire.Before(now) {
-               c.metrics.sessionMisses.Inc()
-               expired = true
-       } else {
-               c.metrics.sessionHits.Inc()
-       }
-       select {
-       case c.chPruneSessions <- struct{}{}:
-       default:
-       }
-       fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-       if fs != nil && !expired {
-               return fs, sess, nil
+               kc := keepclient.New(arvadosclient)
+               kc.DiskCacheSize = c.cluster.Collections.WebDAVCache.DiskCacheSize
+               sess = &cachedSession{
+                       cache:         c,
+                       client:        client,
+                       arvadosclient: arvadosclient,
+                       keepclient:    kc,
+               }
+               c.sessions[token] = sess
        }
-       fs = sess.client.SiteFileSystem(sess.keepclient)
-       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
-       sess.fs.Store(fs)
-       return fs, sess, nil
+       sess.mtx.RLock()
+       return sess, nil
 }
 
-// Remove all expired session cache entries, then remove more entries
-// until approximate remaining size <= maxsize/2
-func (c *cache) pruneSessions() {
-       now := time.Now()
-       var size int64
-       keys := c.sessions.Keys()
-       for _, token := range keys {
-               ent, ok := c.sessions.Peek(token)
-               if !ok {
-                       continue
-               }
-               s := ent.(*cachedSession)
-               if s.expire.Before(now) {
-                       c.sessions.Remove(token)
-                       continue
-               }
-               if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
-                       size += fs.MemorySize()
-               }
+// Get a long-lived CustomFileSystem suitable for doing a read or
+// write operation with the given token.
+//
+// If the returned error is nil, the caller must call Release() on the
+// returned session when finished using it.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
+       sess, err := c.checkout(token)
+       if err != nil {
+               return nil, nil, nil, err
        }
-       // Remove tokens until reaching size limit, starting with the
-       // least frequently used entries (which Keys() returns last).
-       for i := len(keys) - 1; i >= 0; i-- {
-               token := keys[i]
-               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
-                       break
-               }
-               ent, ok := c.sessions.Peek(token)
-               if !ok {
-                       continue
-               }
-               s := ent.(*cachedSession)
-               fs, _ := s.fs.Load().(arvados.CustomFileSystem)
-               if fs == nil {
-                       continue
+       sess.refresh.Lock()
+       defer sess.refresh.Unlock()
+       now := time.Now()
+       sess.lastuse = now
+       refresh := sess.expire.Before(now)
+       if sess.fs == nil || !sess.userLoaded || refresh {
+               // Wait for all active users to finish (otherwise they
+               // might make changes to an old fs after we start
+               // using the new fs).
+               sess.inuse.Lock()
+               if !sess.userLoaded || refresh {
+                       err := sess.client.RequestAndDecode(&sess.user, "GET", "arvados/v1/users/current", nil, nil)
+                       if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
+                               // token is OK, but "get user id" api is out
+                               // of scope -- use existing/expired info if
+                               // any, or leave empty for unknown user
+                       } else if err != nil {
+                               sess.inuse.Unlock()
+                               sess.mtx.RUnlock()
+                               return nil, nil, nil, err
+                       }
+                       sess.userLoaded = true
                }
-               c.sessions.Remove(token)
-               size -= fs.MemorySize()
-       }
-}
-
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
-       c.setupOnce.Do(c.setup)
-       c.metrics.requests.Inc()
 
-       var pdhRefresh bool
-       var pdh string
-       if arvadosclient.PDHMatch(targetID) {
-               pdh = targetID
-       } else if ent, cached := c.pdhs.Get(targetID); cached {
-               ent := ent.(*cachedPDH)
-               if ent.expire.Before(time.Now()) {
-                       c.pdhs.Remove(targetID)
+               if sess.fs == nil || refresh {
+                       sess.fs = sess.client.SiteFileSystem(sess.keepclient)
+                       sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+                       sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
+                       c.metrics.sessionMisses.Inc()
                } else {
-                       pdh = ent.pdh
-                       pdhRefresh = forceReload || time.Now().After(ent.refresh)
-                       c.metrics.pdhHits.Inc()
+                       c.metrics.sessionHits.Inc()
                }
-       }
-
-       if pdh == "" {
-               // UUID->PDH mapping is not cached, might as well get
-               // the whole collection record and be done (below).
-               c.logger.Debugf("cache(%s): have no pdh", targetID)
-       } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
-               // PDH->manifest is not cached, might as well get the
-               // whole collection record (below).
-               c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
-       } else if !pdhRefresh {
-               // We looked up UUID->PDH very recently, and we still
-               // have the manifest for that PDH.
-               c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
-               return cached, nil
+               sess.inuse.Unlock()
        } else {
-               // Get current PDH for this UUID (and confirm we still
-               // have read permission).  Most likely, the cached PDH
-               // is still correct, in which case we can use our
-               // cached manifest.
-               c.metrics.apiCalls.Inc()
-               var current arvados.Collection
-               err := arv.Get("collections", targetID, selectPDH, &current)
-               if err != nil {
-                       return nil, err
-               }
-               if current.PortableDataHash == pdh {
-                       // PDH has not changed, cached manifest is
-                       // correct.
-                       c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
-                       return cached, nil
-               }
-               if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
-                       // PDH changed, and we already have the
-                       // manifest for that new PDH.
-                       c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
-                       return cached, nil
-               }
+               c.metrics.sessionHits.Inc()
        }
+       sess.inuse.RLock()
+       return sess.fs, sess, &sess.user, nil
+}
 
-       // Either UUID->PDH is not cached, or PDH->manifest is not
-       // cached.
-       var retrieved arvados.Collection
-       c.metrics.apiCalls.Inc()
-       err := arv.Get("collections", targetID, nil, &retrieved)
-       if err != nil {
-               return nil, err
-       }
-       c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
-       exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
-       if targetID != retrieved.PortableDataHash {
-               c.pdhs.Add(targetID, &cachedPDH{
-                       expire:  exp,
-                       refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-                       pdh:     retrieved.PortableDataHash,
+type sessionSnapshot struct {
+       token   string
+       sess    *cachedSession
+       lastuse time.Time
+       fs      arvados.CustomFileSystem
+       size    int64
+       prune   bool
+}
+
+// Remove all expired idle session cache entries, and remove in-memory
+// filesystems until approximate remaining size <= maxsize
+func (c *cache) pruneSessions() {
+       now := time.Now()
+       c.mtx.Lock()
+       snaps := make([]sessionSnapshot, 0, len(c.sessions))
+       for token, sess := range c.sessions {
+               snaps = append(snaps, sessionSnapshot{
+                       token: token,
+                       sess:  sess,
                })
        }
-       c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
-               expire:     exp,
-               collection: &retrieved,
-       })
-       if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
-               select {
-               case c.chPruneCollections <- struct{}{}:
-               default:
-               }
+       c.mtx.Unlock()
+
+       // Load lastuse/fs/expire data from sessions. Note we do this
+       // after unlocking c.mtx because sess.refresh.Lock sometimes
+       // waits for another goroutine to finish "[re]fetch user
+       // record".
+       for i := range snaps {
+               snaps[i].sess.refresh.Lock()
+               snaps[i].lastuse = snaps[i].sess.lastuse
+               snaps[i].fs = snaps[i].sess.fs
+               snaps[i].prune = snaps[i].sess.expire.Before(now)
+               snaps[i].sess.refresh.Unlock()
        }
-       return &retrieved, nil
-}
 
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
+       // Sort sessions with oldest first.
+       sort.Slice(snaps, func(i, j int) bool {
+               return snaps[i].lastuse.Before(snaps[j].lastuse)
+       })
+
+       // Add up size of sessions that aren't already marked for
+       // pruning based on expire time.
        var size int64
-       now := time.Now()
-       keys := c.collections.Keys()
-       entsize := make([]int, len(keys))
-       expired := make([]bool, len(keys))
-       for i, k := range keys {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
+       for i, snap := range snaps {
+               if !snap.prune && snap.fs != nil {
+                       size := snap.fs.MemorySize()
+                       snaps[i].size = size
+                       size += size
                }
-               ent := v.(*cachedCollection)
-               n := len(ent.collection.ManifestText)
-               size += int64(n)
-               entsize[i] = n
-               expired[i] = ent.expire.Before(now)
        }
-       for i, k := range keys {
-               if expired[i] {
-                       c.collections.Remove(k)
-                       size -= int64(entsize[i])
-               }
-       }
-       for i, k := range keys {
-               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+       // Mark more sessions for deletion until reaching desired
+       // memory size limit, starting with the oldest entries.
+       for i, snap := range snaps {
+               if size <= int64(c.cluster.Collections.WebDAVCache.MaxCollectionBytes) {
                        break
                }
-               if expired[i] {
-                       // already removed this entry in the previous loop
+               if snap.prune {
                        continue
                }
-               c.collections.Remove(k)
-               size -= int64(entsize[i])
+               snaps[i].prune = true
+               size -= snap.size
        }
-}
 
-// collectionBytes returns the approximate combined memory size of the
-// collection cache and session filesystem cache.
-func (c *cache) collectionBytes() uint64 {
-       var size uint64
-       for _, k := range c.collections.Keys() {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
+       // Mark more sessions for deletion until reaching desired
+       // session count limit.
+       mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
+       for i := range snaps {
+               if snaps[i].prune {
+                       mustprune--
                }
-               size += uint64(len(v.(*cachedCollection).collection.ManifestText))
        }
-       for _, token := range c.sessions.Keys() {
-               ent, ok := c.sessions.Peek(token)
-               if !ok {
-                       continue
-               }
-               if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
-                       size += uint64(fs.MemorySize())
+       for i := range snaps {
+               if mustprune < 1 {
+                       break
+               } else if !snaps[i].prune {
+                       snaps[i].prune = true
+                       mustprune--
                }
        }
-       return size
-}
 
-func (c *cache) lookupCollection(key string) *arvados.Collection {
-       e, cached := c.collections.Get(key)
-       if !cached {
-               return nil
-       }
-       ent := e.(*cachedCollection)
-       if ent.expire.Before(time.Now()) {
-               c.collections.Remove(key)
-               return nil
+       c.mtx.Lock()
+       defer c.mtx.Unlock()
+       for _, snap := range snaps {
+               if !snap.prune {
+                       continue
+               }
+               sess := snap.sess
+               if sess.mtx.TryLock() {
+                       delete(c.sessions, snap.token)
+                       continue
+               }
+               // We can't remove a session that's been checked out
+               // -- that would allow another session to be created
+               // for the same token using a different in-memory
+               // filesystem. Instead, we wait for active requests to
+               // finish and then "unload" it. After this, either the
+               // next GetSession will reload fs/user, or a
+               // subsequent pruneSessions will remove the session.
+               go func() {
+                       // Ensure nobody is mid-GetSession() (note we
+                       // already know nobody is mid-checkout()
+                       // because we have c.mtx locked)
+                       sess.refresh.Lock()
+                       defer sess.refresh.Unlock()
+                       // Wait for current usage to finish (i.e.,
+                       // anyone who has decided to use the current
+                       // values of sess.fs and sess.user, and hasn't
+                       // called Release() yet)
+                       sess.inuse.Lock()
+                       defer sess.inuse.Unlock()
+                       // Release memory
+                       sess.fs = nil
+                       // Next GetSession will make a new fs
+               }()
        }
-       c.metrics.collectionHits.Inc()
-       return ent.collection
 }
 
-func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
-       // Get and cache user record associated with this
-       // token.  We need to know their UUID for logging, and
-       // whether they are an admin or not for certain
-       // permission checks.
-
-       // Get/create session entry
-       _, sess, err := c.GetSession(token)
-       if err != nil {
-               return nil, err
+// sessionsSize returns the number and approximate total memory size
+// of all cached sessions.
+func (c *cache) sessionsSize() (n int, size int64) {
+       c.mtx.Lock()
+       n = len(c.sessions)
+       sessions := make([]*cachedSession, 0, n)
+       for _, sess := range c.sessions {
+               sessions = append(sessions, sess)
        }
-
-       // See if the user is already set, and if so, return it
-       user, _ := sess.user.Load().(*arvados.User)
-       if user != nil {
-               return user, nil
-       }
-
-       // Fetch the user record
-       c.metrics.apiCalls.Inc()
-       var current arvados.User
-
-       err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
-       if err != nil {
-               return nil, err
+       c.mtx.Unlock()
+       for _, sess := range sessions {
+               sess.refresh.Lock()
+               fs := sess.fs
+               sess.refresh.Unlock()
+               if fs != nil {
+                       size += fs.MemorySize()
+               }
        }
-
-       // Stash the user record for next time
-       sess.user.Store(&current)
-       return &current, nil
+       return
 }