16745: Keep a SiteFileSystem alive for multiple read requests.
[arvados.git] / services / keep-web / cache.go
index eeb78ad9058d6c35e8b544cbef1a5c6500c90bcf..71e99533042c693badbb1c00a5dc9e60af02b875 100644 (file)
@@ -6,23 +6,27 @@ package main
 
 import (
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
-       "github.com/hashicorp/golang-lru"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       lru "github.com/hashicorp/golang-lru"
        "github.com/prometheus/client_golang/prometheus"
 )
 
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       config      *arvados.WebDAVCacheConfig
+       cluster     *arvados.Cluster
+       config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
        registry    *prometheus.Registry
        metrics     cacheMetrics
        pdhs        *lru.TwoQueueCache
        collections *lru.TwoQueueCache
        permissions *lru.TwoQueueCache
+       sessions    *lru.TwoQueueCache
        setupOnce   sync.Once
 }
 
@@ -30,9 +34,12 @@ type cacheMetrics struct {
        requests          prometheus.Counter
        collectionBytes   prometheus.Gauge
        collectionEntries prometheus.Gauge
+       sessionEntries    prometheus.Gauge
        collectionHits    prometheus.Counter
        pdhHits           prometheus.Counter
        permissionHits    prometheus.Counter
+       sessionHits       prometheus.Counter
+       sessionMisses     prometheus.Counter
        apiCalls          prometheus.Counter
 }
 
@@ -86,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
                Help:      "Number of manifests in cache.",
        })
        reg.MustRegister(m.collectionEntries)
+       m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "active",
+               Help:      "Number of active token sessions.",
+       })
+       reg.MustRegister(m.sessionEntries)
+       m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "hits",
+               Help:      "Number of token session cache hits.",
+       })
+       reg.MustRegister(m.sessionHits)
+       m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "misses",
+               Help:      "Number of token session cache misses.",
+       })
+       reg.MustRegister(m.sessionMisses)
 }
 
 type cachedPDH struct {
@@ -102,6 +130,11 @@ type cachedPermission struct {
        expire time.Time
 }
 
+type cachedSession struct {
+       expire time.Time
+       fs     atomic.Value
+}
+
 func (c *cache) setup() {
        var err error
        c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
@@ -116,6 +149,10 @@ func (c *cache) setup() {
        if err != nil {
                panic(err)
        }
+       c.sessions, err = lru.New2Q(c.config.MaxSessions)
+       if err != nil {
+               panic(err)
+       }
 
        reg := c.registry
        if reg == nil {
@@ -132,6 +169,7 @@ func (c *cache) setup() {
 func (c *cache) updateGauges() {
        c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
        c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+       c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
 var selectPDH = map[string]interface{}{
@@ -165,6 +203,80 @@ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvad
        return err
 }
 
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
+       c.setupOnce.Do(c.setup)
+       c.sessions.Remove(token)
+}
+
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+       c.setupOnce.Do(c.setup)
+       now := time.Now()
+       ent, _ := c.sessions.Get(token)
+       sess, _ := ent.(*cachedSession)
+       if sess == nil {
+               c.metrics.sessionMisses.Inc()
+               sess = &cachedSession{
+                       expire: now.Add(c.config.TTL.Duration()),
+               }
+               c.sessions.Add(token, sess)
+       } else if sess.expire.Before(now) {
+               c.metrics.sessionMisses.Inc()
+               sess.fs.Store(nil)
+       } else {
+               c.metrics.sessionHits.Inc()
+       }
+       go c.pruneSessions()
+       fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+       if fs != nil {
+               return fs, nil
+       }
+       ac, err := arvados.NewClientFromConfig(c.cluster)
+       if err != nil {
+               return nil, err
+       }
+       ac.AuthToken = token
+       arv, err := arvadosclient.New(ac)
+       if err != nil {
+               return nil, err
+       }
+       kc := keepclient.New(arv)
+       fs = ac.SiteFileSystem(kc)
+       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+       sess.fs.Store(fs)
+       return fs, nil
+}
+
+func (c *cache) pruneSessions() {
+       now := time.Now()
+       var size int64
+       for _, token := range c.sessions.Keys() {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               s := ent.(*cachedSession)
+               if s.expire.Before(now) {
+                       c.sessions.Remove(token)
+                       continue
+               }
+               fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+               if fs == nil {
+                       continue
+               }
+               size += fs.MemorySize()
+       }
+       if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+               for _, token := range c.sessions.Keys() {
+                       c.sessions.Remove(token)
+               }
+       }
+}
+
 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
        c.setupOnce.Do(c.setup)
        c.metrics.requests.Inc()
@@ -288,7 +400,7 @@ func (c *cache) pruneCollections() {
                }
        }
        for i, k := range keys {
-               if size <= c.config.MaxCollectionBytes {
+               if size <= c.config.MaxCollectionBytes/2 {
                        break
                }
                if expired[i] {
@@ -300,8 +412,8 @@ func (c *cache) pruneCollections() {
        }
 }
 
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
        for _, k := range c.collections.Keys() {
@@ -311,6 +423,15 @@ func (c *cache) collectionBytes() uint64 {
                }
                size += uint64(len(v.(*cachedCollection).collection.ManifestText))
        }
+       for _, token := range c.sessions.Keys() {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+                       size += uint64(fs.MemorySize())
+               }
+       }
        return size
 }