18947: Refactor keep-web as arvados-server command.
[arvados.git] / services / keep-web / cache.go
index 8d1062825e85d79a1fe7e60289437c2182060b62..d5fdc4997ecee3c67d1cc8ea4003c2c1e3f6cac4 100644 (file)
@@ -2,37 +2,46 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepweb
 
 import (
        "sync"
+       "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "github.com/hashicorp/golang-lru"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       lru "github.com/hashicorp/golang-lru"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       config      *arvados.WebDAVCacheConfig
+       cluster     *arvados.Cluster
+       logger      logrus.FieldLogger
        registry    *prometheus.Registry
        metrics     cacheMetrics
        pdhs        *lru.TwoQueueCache
        collections *lru.TwoQueueCache
-       permissions *lru.TwoQueueCache
+       sessions    *lru.TwoQueueCache
        setupOnce   sync.Once
+
+       chPruneSessions    chan struct{}
+       chPruneCollections chan struct{}
 }
 
 type cacheMetrics struct {
        requests          prometheus.Counter
        collectionBytes   prometheus.Gauge
        collectionEntries prometheus.Gauge
+       sessionEntries    prometheus.Gauge
        collectionHits    prometheus.Counter
        pdhHits           prometheus.Counter
-       permissionHits    prometheus.Counter
+       sessionHits       prometheus.Counter
+       sessionMisses     prometheus.Counter
        apiCalls          prometheus.Counter
 }
 
@@ -58,13 +67,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
                Help:      "Number of uuid-to-pdh cache hits.",
        })
        reg.MustRegister(m.pdhHits)
-       m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "permission_hits",
-               Help:      "Number of targetID-to-permission cache hits.",
-       })
-       reg.MustRegister(m.permissionHits)
        m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_collectioncache",
@@ -74,9 +76,9 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
        reg.MustRegister(m.apiCalls)
        m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "cached_manifest_bytes",
-               Help:      "Total size of all manifests in cache.",
+               Subsystem: "keepweb_sessions",
+               Name:      "cached_collection_bytes",
+               Help:      "Total size of all cached manifests and sessions.",
        })
        reg.MustRegister(m.collectionBytes)
        m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
@@ -86,11 +88,33 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
                Help:      "Number of manifests in cache.",
        })
        reg.MustRegister(m.collectionEntries)
+       m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "active",
+               Help:      "Number of active token sessions.",
+       })
+       reg.MustRegister(m.sessionEntries)
+       m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "hits",
+               Help:      "Number of token session cache hits.",
+       })
+       reg.MustRegister(m.sessionHits)
+       m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "misses",
+               Help:      "Number of token session cache misses.",
+       })
+       reg.MustRegister(m.sessionMisses)
 }
 
 type cachedPDH struct {
-       expire time.Time
-       pdh    string
+       expire  time.Time
+       refresh time.Time
+       pdh     string
 }
 
 type cachedCollection struct {
@@ -102,17 +126,26 @@ type cachedPermission struct {
        expire time.Time
 }
 
+type cachedSession struct {
+       expire        time.Time
+       fs            atomic.Value
+       client        *arvados.Client
+       arvadosclient *arvadosclient.ArvadosClient
+       keepclient    *keepclient.KeepClient
+       user          atomic.Value
+}
+
 func (c *cache) setup() {
        var err error
-       c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
+       c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
        if err != nil {
                panic(err)
        }
-       c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
+       c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
        if err != nil {
                panic(err)
        }
-       c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
+       c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
        if err != nil {
                panic(err)
        }
@@ -127,11 +160,24 @@ func (c *cache) setup() {
                        c.updateGauges()
                }
        }()
+       c.chPruneCollections = make(chan struct{}, 1)
+       go func() {
+               for range c.chPruneCollections {
+                       c.pruneCollections()
+               }
+       }()
+       c.chPruneSessions = make(chan struct{}, 1)
+       go func() {
+               for range c.chPruneSessions {
+                       c.pruneSessions()
+               }
+       }()
 }
 
 func (c *cache) updateGauges() {
        c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
        c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+       c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
 var selectPDH = map[string]interface{}{
@@ -144,44 +190,132 @@ var selectPDH = map[string]interface{}{
 func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
        c.setupOnce.Do(c.setup)
 
-       if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
+       m, err := fs.MarshalManifest(".")
+       if err != nil || m == coll.ManifestText {
                return err
-       } else {
-               coll.ManifestText = m
        }
+       coll.ManifestText = m
        var updated arvados.Collection
-       defer c.pdhs.Remove(coll.UUID)
-       err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+       err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
                "collection": map[string]string{
                        "manifest_text": coll.ManifestText,
                },
        })
-       if err == nil {
-               c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
-                       expire:     time.Now().Add(time.Duration(c.config.TTL)),
-                       collection: &updated,
-               })
+       if err != nil {
+               c.pdhs.Remove(coll.UUID)
+               return err
        }
-       return err
+       c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
+               expire:     time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
+               collection: &updated,
+       })
+       c.pdhs.Add(coll.UUID, &cachedPDH{
+               expire:  time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
+               refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
+               pdh:     updated.PortableDataHash,
+       })
+       return nil
 }
 
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
        c.setupOnce.Do(c.setup)
-       c.metrics.requests.Inc()
+       c.sessions.Remove(token)
+}
 
-       permOK := false
-       permKey := arv.ApiToken + "\000" + targetID
-       if forceReload {
-       } else if ent, cached := c.permissions.Get(permKey); cached {
-               ent := ent.(*cachedPermission)
-               if ent.expire.Before(time.Now()) {
-                       c.permissions.Remove(permKey)
-               } else {
-                       permOK = true
-                       c.metrics.permissionHits.Inc()
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+       c.setupOnce.Do(c.setup)
+       now := time.Now()
+       ent, _ := c.sessions.Get(token)
+       sess, _ := ent.(*cachedSession)
+       expired := false
+       if sess == nil {
+               c.metrics.sessionMisses.Inc()
+               sess = &cachedSession{
+                       expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
+               }
+               var err error
+               sess.client, err = arvados.NewClientFromConfig(c.cluster)
+               if err != nil {
+                       return nil, nil, err
                }
+               sess.client.AuthToken = token
+               sess.arvadosclient, err = arvadosclient.New(sess.client)
+               if err != nil {
+                       return nil, nil, err
+               }
+               sess.keepclient = keepclient.New(sess.arvadosclient)
+               c.sessions.Add(token, sess)
+       } else if sess.expire.Before(now) {
+               c.metrics.sessionMisses.Inc()
+               expired = true
+       } else {
+               c.metrics.sessionHits.Inc()
        }
+       select {
+       case c.chPruneSessions <- struct{}{}:
+       default:
+       }
+       fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+       if fs != nil && !expired {
+               return fs, sess, nil
+       }
+       fs = sess.client.SiteFileSystem(sess.keepclient)
+       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+       sess.fs.Store(fs)
+       return fs, sess, nil
+}
 
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
+func (c *cache) pruneSessions() {
+       now := time.Now()
+       var size int64
+       keys := c.sessions.Keys()
+       for _, token := range keys {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               s := ent.(*cachedSession)
+               if s.expire.Before(now) {
+                       c.sessions.Remove(token)
+                       continue
+               }
+               if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+                       size += fs.MemorySize()
+               }
+       }
+       // Remove tokens until reaching size limit, starting with the
+       // least frequently used entries (which Keys() returns last).
+       for i := len(keys) - 1; i >= 0; i-- {
+               token := keys[i]
+               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+                       break
+               }
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               s := ent.(*cachedSession)
+               fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+               if fs == nil {
+                       continue
+               }
+               c.sessions.Remove(token)
+               size -= fs.MemorySize()
+       }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+       c.setupOnce.Do(c.setup)
+       c.metrics.requests.Inc()
+
+       var pdhRefresh bool
        var pdh string
        if arvadosclient.PDHMatch(targetID) {
                pdh = targetID
@@ -191,22 +325,29 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo
                        c.pdhs.Remove(targetID)
                } else {
                        pdh = ent.pdh
+                       pdhRefresh = forceReload || time.Now().After(ent.refresh)
                        c.metrics.pdhHits.Inc()
                }
        }
 
-       var collection *arvados.Collection
-       if pdh != "" {
-               collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
-       }
-
-       if collection != nil && permOK {
-               return collection, nil
-       } else if collection != nil {
-               // Ask API for current PDH for this targetID. Most
-               // likely, the cached PDH is still correct; if so,
-               // _and_ the current token has permission, we can
-               // use our cached manifest.
+       if pdh == "" {
+               // UUID->PDH mapping is not cached, might as well get
+               // the whole collection record and be done (below).
+               c.logger.Debugf("cache(%s): have no pdh", targetID)
+       } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
+               // PDH->manifest is not cached, might as well get the
+               // whole collection record (below).
+               c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
+       } else if !pdhRefresh {
+               // We looked up UUID->PDH very recently, and we still
+               // have the manifest for that PDH.
+               c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
+               return cached, nil
+       } else {
+               // Get current PDH for this UUID (and confirm we still
+               // have read permission).  Most likely, the cached PDH
+               // is still correct, in which case we can use our
+               // cached manifest.
                c.metrics.apiCalls.Inc()
                var current arvados.Collection
                err := arv.Get("collections", targetID, selectPDH, &current)
@@ -214,48 +355,47 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo
                        return nil, err
                }
                if current.PortableDataHash == pdh {
-                       c.permissions.Add(permKey, &cachedPermission{
-                               expire: time.Now().Add(time.Duration(c.config.TTL)),
-                       })
-                       if pdh != targetID {
-                               c.pdhs.Add(targetID, &cachedPDH{
-                                       expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
-                                       pdh:    pdh,
-                               })
-                       }
-                       return collection, err
-               } else {
-                       // PDH changed, but now we know we have
-                       // permission -- and maybe we already have the
-                       // new PDH in the cache.
-                       if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
-                               return coll, nil
-                       }
+                       // PDH has not changed, cached manifest is
+                       // correct.
+                       c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
+                       return cached, nil
+               }
+               if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
+                       // PDH changed, and we already have the
+                       // manifest for that new PDH.
+                       c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
+                       return cached, nil
                }
        }
 
-       // Collection manifest is not cached.
+       // Either UUID->PDH is not cached, or PDH->manifest is not
+       // cached.
+       var retrieved arvados.Collection
        c.metrics.apiCalls.Inc()
-       err := arv.Get("collections", targetID, nil, &collection)
+       err := arv.Get("collections", targetID, nil, &retrieved)
        if err != nil {
                return nil, err
        }
-       exp := time.Now().Add(time.Duration(c.config.TTL))
-       c.permissions.Add(permKey, &cachedPermission{
-               expire: exp,
-       })
-       c.pdhs.Add(targetID, &cachedPDH{
-               expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
-               pdh:    collection.PortableDataHash,
-       })
-       c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
+       c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
+       exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
+       if targetID != retrieved.PortableDataHash {
+               c.pdhs.Add(targetID, &cachedPDH{
+                       expire:  exp,
+                       refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
+                       pdh:     retrieved.PortableDataHash,
+               })
+       }
+       c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
                expire:     exp,
-               collection: collection,
+               collection: &retrieved,
        })
-       if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
-               go c.pruneCollections()
+       if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
+               select {
+               case c.chPruneCollections <- struct{}{}:
+               default:
+               }
        }
-       return collection, nil
+       return &retrieved, nil
 }
 
 // pruneCollections checks the total bytes occupied by manifest_text
@@ -289,7 +429,7 @@ func (c *cache) pruneCollections() {
                }
        }
        for i, k := range keys {
-               if size <= c.config.MaxCollectionBytes {
+               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
                        break
                }
                if expired[i] {
@@ -301,8 +441,8 @@ func (c *cache) pruneCollections() {
        }
 }
 
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
        for _, k := range c.collections.Keys() {
@@ -312,6 +452,15 @@ func (c *cache) collectionBytes() uint64 {
                }
                size += uint64(len(v.(*cachedCollection).collection.ManifestText))
        }
+       for _, token := range c.sessions.Keys() {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+                       size += uint64(fs.MemorySize())
+               }
+       }
        return size
 }
 
@@ -328,3 +477,35 @@ func (c *cache) lookupCollection(key string) *arvados.Collection {
        c.metrics.collectionHits.Inc()
        return ent.collection
 }
+
+func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
+       // Get and cache user record associated with this
+       // token.  We need to know their UUID for logging, and
+       // whether they are an admin or not for certain
+       // permission checks.
+
+       // Get/create session entry
+       _, sess, err := c.GetSession(token)
+       if err != nil {
+               return nil, err
+       }
+
+       // See if the user is already set, and if so, return it
+       user, _ := sess.user.Load().(*arvados.User)
+       if user != nil {
+               return user, nil
+       }
+
+       // Fetch the user record
+       c.metrics.apiCalls.Inc()
+       var current arvados.User
+
+       err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       // Stash the user record for next time
+       sess.user.Store(&current)
+       return &current, nil
+}