16745: Rename session cache size metric.
[arvados.git] / services / keep-web / cache.go
index ce1168acd2c1d07bcd6e8623c5421bcbe905f04c..07db7a016f7bbd25442b4b7500e53633bd4b0059 100644 (file)
@@ -9,34 +9,111 @@ import (
        "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "github.com/hashicorp/golang-lru"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       lru "github.com/hashicorp/golang-lru"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
+const metricsUpdateInterval = time.Second / 10
+
 type cache struct {
-       TTL                  arvados.Duration
-       UUIDTTL              arvados.Duration
-       MaxCollectionEntries int
-       MaxCollectionBytes   int64
-       MaxPermissionEntries int
-       MaxUUIDEntries       int
-
-       stats       cacheStats
+       cluster     *arvados.Cluster
+       config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
+       registry    *prometheus.Registry
+       metrics     cacheMetrics
        pdhs        *lru.TwoQueueCache
        collections *lru.TwoQueueCache
        permissions *lru.TwoQueueCache
+       sessions    *lru.TwoQueueCache
        setupOnce   sync.Once
 }
 
-type cacheStats struct {
-       Requests          uint64 `json:"Cache.Requests"`
-       CollectionBytes   uint64 `json:"Cache.CollectionBytes"`
-       CollectionEntries int    `json:"Cache.CollectionEntries"`
-       CollectionHits    uint64 `json:"Cache.CollectionHits"`
-       PDHHits           uint64 `json:"Cache.UUIDHits"`
-       PermissionHits    uint64 `json:"Cache.PermissionHits"`
-       APICalls          uint64 `json:"Cache.APICalls"`
+type cacheMetrics struct {
+       requests          prometheus.Counter
+       collectionBytes   prometheus.Gauge
+       collectionEntries prometheus.Gauge
+       sessionEntries    prometheus.Gauge
+       collectionHits    prometheus.Counter
+       pdhHits           prometheus.Counter
+       permissionHits    prometheus.Counter
+       sessionHits       prometheus.Counter
+       sessionMisses     prometheus.Counter
+       apiCalls          prometheus.Counter
+}
+
+func (m *cacheMetrics) setup(reg *prometheus.Registry) {
+       m.requests = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_collectioncache",
+               Name:      "requests",
+               Help:      "Number of targetID-to-manifest lookups handled.",
+       })
+       reg.MustRegister(m.requests)
+       m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_collectioncache",
+               Name:      "hits",
+               Help:      "Number of pdh-to-manifest cache hits.",
+       })
+       reg.MustRegister(m.collectionHits)
+       m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_collectioncache",
+               Name:      "pdh_hits",
+               Help:      "Number of uuid-to-pdh cache hits.",
+       })
+       reg.MustRegister(m.pdhHits)
+       m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_collectioncache",
+               Name:      "permission_hits",
+               Help:      "Number of targetID-to-permission cache hits.",
+       })
+       reg.MustRegister(m.permissionHits)
+       m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_collectioncache",
+               Name:      "api_calls",
+               Help:      "Number of outgoing API calls made by cache.",
+       })
+       reg.MustRegister(m.apiCalls)
+       m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "cached_collection_bytes",
+               Help:      "Total size of all cached manifests and sessions.",
+       })
+       reg.MustRegister(m.collectionBytes)
+       m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_collectioncache",
+               Name:      "cached_manifests",
+               Help:      "Number of manifests in cache.",
+       })
+       reg.MustRegister(m.collectionEntries)
+       m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "active",
+               Help:      "Number of active token sessions.",
+       })
+       reg.MustRegister(m.sessionEntries)
+       m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "hits",
+               Help:      "Number of token session cache hits.",
+       })
+       reg.MustRegister(m.sessionHits)
+       m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "misses",
+               Help:      "Number of token session cache misses.",
+       })
+       reg.MustRegister(m.sessionMisses)
 }
 
 type cachedPDH struct {
@@ -53,43 +130,172 @@ type cachedPermission struct {
        expire time.Time
 }
 
+type cachedSession struct {
+       expire time.Time
+       fs     atomic.Value
+}
+
 func (c *cache) setup() {
        var err error
-       c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
+       c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
        if err != nil {
                panic(err)
        }
-       c.collections, err = lru.New2Q(c.MaxCollectionEntries)
+       c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
        if err != nil {
                panic(err)
        }
-       c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
+       c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
        if err != nil {
                panic(err)
        }
+       c.sessions, err = lru.New2Q(c.config.MaxSessions)
+       if err != nil {
+               panic(err)
+       }
+
+       reg := c.registry
+       if reg == nil {
+               reg = prometheus.NewRegistry()
+       }
+       c.metrics.setup(reg)
+       go func() {
+               for range time.Tick(metricsUpdateInterval) {
+                       c.updateGauges()
+               }
+       }()
+}
+
+func (c *cache) updateGauges() {
+       c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
+       c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+       c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
 var selectPDH = map[string]interface{}{
        "select": []string{"portable_data_hash"},
 }
 
-func (c *cache) Stats() cacheStats {
+// Update saves a modified version (fs) to an existing collection
+// (coll) and, if successful, updates the relevant cache entries so
+// subsequent calls to Get() reflect the modifications.
+func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
        c.setupOnce.Do(c.setup)
-       return cacheStats{
-               Requests:          atomic.LoadUint64(&c.stats.Requests),
-               CollectionBytes:   c.collectionBytes(),
-               CollectionEntries: c.collections.Len(),
-               CollectionHits:    atomic.LoadUint64(&c.stats.CollectionHits),
-               PDHHits:           atomic.LoadUint64(&c.stats.PDHHits),
-               PermissionHits:    atomic.LoadUint64(&c.stats.PermissionHits),
-               APICalls:          atomic.LoadUint64(&c.stats.APICalls),
+
+       m, err := fs.MarshalManifest(".")
+       if err != nil || m == coll.ManifestText {
+               return err
        }
+       coll.ManifestText = m
+       var updated arvados.Collection
+       defer c.pdhs.Remove(coll.UUID)
+       err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": coll.ManifestText,
+               },
+       })
+       if err == nil {
+               c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
+                       expire:     time.Now().Add(time.Duration(c.config.TTL)),
+                       collection: &updated,
+               })
+       }
+       return err
 }
 
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
        c.setupOnce.Do(c.setup)
+       c.sessions.Remove(token)
+}
+
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+       c.setupOnce.Do(c.setup)
+       now := time.Now()
+       ent, _ := c.sessions.Get(token)
+       sess, _ := ent.(*cachedSession)
+       expired := false
+       if sess == nil {
+               c.metrics.sessionMisses.Inc()
+               sess = &cachedSession{
+                       expire: now.Add(c.config.TTL.Duration()),
+               }
+               c.sessions.Add(token, sess)
+       } else if sess.expire.Before(now) {
+               c.metrics.sessionMisses.Inc()
+               expired = true
+       } else {
+               c.metrics.sessionHits.Inc()
+       }
+       go c.pruneSessions()
+       fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+       if fs != nil && !expired {
+               return fs, nil
+       }
+       ac, err := arvados.NewClientFromConfig(c.cluster)
+       if err != nil {
+               return nil, err
+       }
+       ac.AuthToken = token
+       arv, err := arvadosclient.New(ac)
+       if err != nil {
+               return nil, err
+       }
+       kc := keepclient.New(arv)
+       fs = ac.SiteFileSystem(kc)
+       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+       sess.fs.Store(fs)
+       return fs, nil
+}
 
-       atomic.AddUint64(&c.stats.Requests, 1)
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
+func (c *cache) pruneSessions() {
+       now := time.Now()
+       var size int64
+       keys := c.sessions.Keys()
+       for _, token := range keys {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               s := ent.(*cachedSession)
+               if s.expire.Before(now) {
+                       c.sessions.Remove(token)
+                       continue
+               }
+               if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+                       size += fs.MemorySize()
+               }
+       }
+       // Remove tokens until reaching size limit, starting with the
+       // least frequently used entries (which Keys() returns last).
+       for i := len(keys) - 1; i >= 0; i-- {
+               token := keys[i]
+               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+                       break
+               }
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               s := ent.(*cachedSession)
+               fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+               if fs == nil {
+                       continue
+               }
+               c.sessions.Remove(token)
+               size -= fs.MemorySize()
+       }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+       c.setupOnce.Do(c.setup)
+       c.metrics.requests.Inc()
 
        permOK := false
        permKey := arv.ApiToken + "\000" + targetID
@@ -100,7 +306,7 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo
                        c.permissions.Remove(permKey)
                } else {
                        permOK = true
-                       atomic.AddUint64(&c.stats.PermissionHits, 1)
+                       c.metrics.permissionHits.Inc()
                }
        }
 
@@ -113,7 +319,7 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo
                        c.pdhs.Remove(targetID)
                } else {
                        pdh = ent.pdh
-                       atomic.AddUint64(&c.stats.PDHHits, 1)
+                       c.metrics.pdhHits.Inc()
                }
        }
 
@@ -129,7 +335,7 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo
                // likely, the cached PDH is still correct; if so,
                // _and_ the current token has permission, we can
                // use our cached manifest.
-               atomic.AddUint64(&c.stats.APICalls, 1)
+               c.metrics.apiCalls.Inc()
                var current arvados.Collection
                err := arv.Get("collections", targetID, selectPDH, &current)
                if err != nil {
@@ -137,44 +343,43 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo
                }
                if current.PortableDataHash == pdh {
                        c.permissions.Add(permKey, &cachedPermission{
-                               expire: time.Now().Add(time.Duration(c.TTL)),
+                               expire: time.Now().Add(time.Duration(c.config.TTL)),
                        })
                        if pdh != targetID {
                                c.pdhs.Add(targetID, &cachedPDH{
-                                       expire: time.Now().Add(time.Duration(c.UUIDTTL)),
+                                       expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
                                        pdh:    pdh,
                                })
                        }
                        return collection, err
-               } else {
-                       // PDH changed, but now we know we have
-                       // permission -- and maybe we already have the
-                       // new PDH in the cache.
-                       if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
-                               return coll, nil
-                       }
+               }
+               // PDH changed, but now we know we have
+               // permission -- and maybe we already have the
+               // new PDH in the cache.
+               if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
+                       return coll, nil
                }
        }
 
        // Collection manifest is not cached.
-       atomic.AddUint64(&c.stats.APICalls, 1)
+       c.metrics.apiCalls.Inc()
        err := arv.Get("collections", targetID, nil, &collection)
        if err != nil {
                return nil, err
        }
-       exp := time.Now().Add(time.Duration(c.TTL))
+       exp := time.Now().Add(time.Duration(c.config.TTL))
        c.permissions.Add(permKey, &cachedPermission{
                expire: exp,
        })
        c.pdhs.Add(targetID, &cachedPDH{
-               expire: time.Now().Add(time.Duration(c.UUIDTTL)),
+               expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
                pdh:    collection.PortableDataHash,
        })
        c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
                expire:     exp,
                collection: collection,
        })
-       if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
+       if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
                go c.pruneCollections()
        }
        return collection, nil
@@ -211,7 +416,7 @@ func (c *cache) pruneCollections() {
                }
        }
        for i, k := range keys {
-               if size <= c.MaxCollectionBytes {
+               if size <= c.config.MaxCollectionBytes/2 {
                        break
                }
                if expired[i] {
@@ -223,8 +428,8 @@ func (c *cache) pruneCollections() {
        }
 }
 
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
        for _, k := range c.collections.Keys() {
@@ -234,20 +439,28 @@ func (c *cache) collectionBytes() uint64 {
                }
                size += uint64(len(v.(*cachedCollection).collection.ManifestText))
        }
+       for _, token := range c.sessions.Keys() {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+                       size += uint64(fs.MemorySize())
+               }
+       }
        return size
 }
 
 func (c *cache) lookupCollection(key string) *arvados.Collection {
-       if ent, cached := c.collections.Get(key); !cached {
+       e, cached := c.collections.Get(key)
+       if !cached {
+               return nil
+       }
+       ent := e.(*cachedCollection)
+       if ent.expire.Before(time.Now()) {
+               c.collections.Remove(key)
                return nil
-       } else {
-               ent := ent.(*cachedCollection)
-               if ent.expire.Before(time.Now()) {
-                       c.collections.Remove(key)
-                       return nil
-               } else {
-                       atomic.AddUint64(&c.stats.CollectionHits, 1)
-                       return ent.collection
-               }
        }
+       c.metrics.collectionHits.Inc()
+       return ent.collection
 }