16745: Keep a SiteFileSystem alive for multiple read requests.
authorTom Clegg <tom@curii.com>
Mon, 22 Feb 2021 16:10:10 +0000 (11:10 -0500)
committerTom Clegg <tom@curii.com>
Mon, 22 Feb 2021 16:10:10 +0000 (11:10 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/config/generated_config.go
sdk/go/arvados/config.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_deferred.go
services/keep-web/cache.go
services/keep-web/handler.go
services/keep-web/main.go
services/keep-web/s3.go

index 68e518732d6f85b8ef377a4f22ea1efebf16af46..bc87b2cc7071723c03c73ab65a0dddc6847d5280 100644 (file)
@@ -523,21 +523,30 @@ Clusters:
       TrustAllContent: false
 
       # Cache parameters for WebDAV content serving:
-      # * TTL: Maximum time to cache manifests and permission checks.
-      # * UUIDTTL: Maximum time to cache collection state.
-      # * MaxBlockEntries: Maximum number of block cache entries.
-      # * MaxCollectionEntries: Maximum number of collection cache entries.
-      # * MaxCollectionBytes: Approximate memory limit for collection cache.
-      # * MaxPermissionEntries: Maximum number of permission cache entries.
-      # * MaxUUIDEntries: Maximum number of UUID cache entries.
       WebDAVCache:
+        # Time to cache manifests, permission checks, and sessions.
         TTL: 300s
+
+        # Time to cache collection state.
         UUIDTTL: 5s
-        MaxBlockEntries:      4
+
+        # Block cache entries. Each block consumes up to 64 MiB RAM.
+        MaxBlockEntries: 4
+
+        # Collection cache entries.
         MaxCollectionEntries: 1000
-        MaxCollectionBytes:   100000000
+
+        # Approximate memory limit (in bytes) for collection cache.
+        MaxCollectionBytes: 100000000
+
+        # Permission cache entries.
         MaxPermissionEntries: 1000
-        MaxUUIDEntries:       1000
+
+        # UUID cache entries.
+        MaxUUIDEntries: 1000
+
+        # Persistent sessions.
+        MaxSessions: 100
 
     Login:
       # One of the following mechanisms (SSO, Google, PAM, LDAP, or
index 8ef787771ebb9986f3b88f52ec69a6851d2eb8d2..f0ae11aab019dc19367c1245ea65bf2da2270018 100644 (file)
@@ -529,21 +529,30 @@ Clusters:
       TrustAllContent: false
 
       # Cache parameters for WebDAV content serving:
-      # * TTL: Maximum time to cache manifests and permission checks.
-      # * UUIDTTL: Maximum time to cache collection state.
-      # * MaxBlockEntries: Maximum number of block cache entries.
-      # * MaxCollectionEntries: Maximum number of collection cache entries.
-      # * MaxCollectionBytes: Approximate memory limit for collection cache.
-      # * MaxPermissionEntries: Maximum number of permission cache entries.
-      # * MaxUUIDEntries: Maximum number of UUID cache entries.
       WebDAVCache:
+        # Time to cache manifests, permission checks, and sessions.
         TTL: 300s
+
+        # Time to cache collection state.
         UUIDTTL: 5s
-        MaxBlockEntries:      4
+
+        # Block cache entries. Each block consumes up to 64 MiB RAM.
+        MaxBlockEntries: 4
+
+        # Collection cache entries.
         MaxCollectionEntries: 1000
-        MaxCollectionBytes:   100000000
+
+        # Approximate memory limit (in bytes) for collection cache.
+        MaxCollectionBytes: 100000000
+
+        # Permission cache entries.
         MaxPermissionEntries: 1000
-        MaxUUIDEntries:       1000
+
+        # UUID cache entries.
+        MaxUUIDEntries: 1000
+
+        # Persistent sessions.
+        MaxSessions: 100
 
     Login:
       # One of the following mechanisms (SSO, Google, PAM, LDAP, or
index 4a56c930213abf389a782fd593622d776da9584f..074914744f9885e65099fd60934e7a46fef5bbcf 100644 (file)
@@ -65,6 +65,7 @@ type WebDAVCacheConfig struct {
        MaxCollectionBytes   int64
        MaxPermissionEntries int
        MaxUUIDEntries       int
+       MaxSessions          int
 }
 
 type Cluster struct {
index aa75fee7c4d0f5bf47826d12300b51cdcf08a424..2478641df5478639c92ff2b4d0f57d847165eee7 100644 (file)
@@ -106,6 +106,9 @@ type FileSystem interface {
        // path is "", flush all dirs/streams; otherwise, flush only
        // the specified dir/stream.
        Flush(path string, shortBlocks bool) error
+
+       // Estimate current memory usage.
+       MemorySize() int64
 }
 
 type inode interface {
@@ -156,6 +159,7 @@ type inode interface {
        sync.Locker
        RLock()
        RUnlock()
+       MemorySize() int64
 }
 
 type fileinfo struct {
@@ -229,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode,
        return nil, ErrNotADirectory
 }
 
+func (*nullnode) MemorySize() int64 {
+       // Types that embed nullnode should report their own size, but
+       // if they don't, we at least report a non-zero size to ensure
+       // a large tree doesn't get reported as 0 bytes.
+       return 64
+}
+
 type treenode struct {
        fs       FileSystem
        parent   inode
@@ -319,6 +330,15 @@ func (n *treenode) Sync() error {
        return nil
 }
 
+func (n *treenode) MemorySize() (size int64) {
+       n.RLock()
+       defer n.RUnlock()
+       for _, inode := range n.inodes {
+               size += inode.MemorySize()
+       }
+       return
+}
+
 type fileSystem struct {
        root inode
        fsBackend
@@ -607,6 +627,10 @@ func (fs *fileSystem) Flush(string, bool) error {
        return ErrInvalidOperation
 }
 
+func (fs *fileSystem) MemorySize() int64 {
+       return fs.root.MemorySize()
+}
+
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
index 1de558a1bda4ab7c2def0c03d32998b0e18535ae..0233826a7281e9aa95f5dbb9f74e93ddb1bfd473 100644 (file)
@@ -38,9 +38,6 @@ type CollectionFileSystem interface {
 
        // Total data bytes in all files.
        Size() int64
-
-       // Memory consumed by buffered file data.
-       memorySize() int64
 }
 
 type collectionFileSystem struct {
@@ -232,10 +229,10 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
        return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).memorySize()
+       return fs.fileSystem.root.(*dirnode).MemorySize()
 }
 
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
@@ -879,14 +876,14 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
 }
 
 // caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
        for _, name := range dn.sortedNames() {
                node := dn.inodes[name]
                node.Lock()
                defer node.Unlock()
                switch node := node.(type) {
                case *dirnode:
-                       size += node.memorySize()
+                       size += node.MemorySize()
                case *filenode:
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
index 59a6a6ba825e57928e9348c17d971988fa24fc94..05c8ea61a14500466ff4bc424b8847788408404b 100644 (file)
@@ -1153,9 +1153,9 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
                        fs.Flush("", true)
                }
 
-               size := fs.memorySize()
+               size := fs.MemorySize()
                if !c.Check(size <= 1<<24, check.Equals, true) {
-                       c.Logf("at dir%d fs.memorySize()=%d", i, size)
+                       c.Logf("at dir%d fs.MemorySize()=%d", i, size)
                        return
                }
        }
@@ -1188,13 +1188,13 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
-       c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
        c.Check(flushed, check.Equals, int64(0))
 
        waitForFlush := func(expectUnflushed, expectFlushed int64) {
-               for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+               for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
                }
-               c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+               c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
                c.Check(flushed, check.Equals, expectFlushed)
        }
 
index 254b90c812a337de96cb34da01b767dbe7adcc5a..bb6c7a26263e23b1cdb733d6d1d00a9ee129175a 100644 (file)
@@ -112,3 +112,4 @@ func (dn *deferrednode) RLock()                          { dn.realinode().RLock(
 func (dn *deferrednode) RUnlock()                        { dn.realinode().RUnlock() }
 func (dn *deferrednode) FS() FileSystem                  { return dn.currentinode().FS() }
 func (dn *deferrednode) Parent() inode                   { return dn.currentinode().Parent() }
+func (dn *deferrednode) MemorySize() int64               { return dn.currentinode().MemorySize() }
index eeb78ad9058d6c35e8b544cbef1a5c6500c90bcf..71e99533042c693badbb1c00a5dc9e60af02b875 100644 (file)
@@ -6,23 +6,27 @@ package main
 
 import (
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
-       "github.com/hashicorp/golang-lru"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       lru "github.com/hashicorp/golang-lru"
        "github.com/prometheus/client_golang/prometheus"
 )
 
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       config      *arvados.WebDAVCacheConfig
+       cluster     *arvados.Cluster
+       config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
        registry    *prometheus.Registry
        metrics     cacheMetrics
        pdhs        *lru.TwoQueueCache
        collections *lru.TwoQueueCache
        permissions *lru.TwoQueueCache
+       sessions    *lru.TwoQueueCache
        setupOnce   sync.Once
 }
 
@@ -30,9 +34,12 @@ type cacheMetrics struct {
        requests          prometheus.Counter
        collectionBytes   prometheus.Gauge
        collectionEntries prometheus.Gauge
+       sessionEntries    prometheus.Gauge
        collectionHits    prometheus.Counter
        pdhHits           prometheus.Counter
        permissionHits    prometheus.Counter
+       sessionHits       prometheus.Counter
+       sessionMisses     prometheus.Counter
        apiCalls          prometheus.Counter
 }
 
@@ -86,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
                Help:      "Number of manifests in cache.",
        })
        reg.MustRegister(m.collectionEntries)
+       m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "active",
+               Help:      "Number of active token sessions.",
+       })
+       reg.MustRegister(m.sessionEntries)
+       m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "hits",
+               Help:      "Number of token session cache hits.",
+       })
+       reg.MustRegister(m.sessionHits)
+       m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "keepweb_sessions",
+               Name:      "misses",
+               Help:      "Number of token session cache misses.",
+       })
+       reg.MustRegister(m.sessionMisses)
 }
 
 type cachedPDH struct {
@@ -102,6 +130,11 @@ type cachedPermission struct {
        expire time.Time
 }
 
+type cachedSession struct {
+       expire time.Time
+       fs     atomic.Value
+}
+
 func (c *cache) setup() {
        var err error
        c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
@@ -116,6 +149,10 @@ func (c *cache) setup() {
        if err != nil {
                panic(err)
        }
+       c.sessions, err = lru.New2Q(c.config.MaxSessions)
+       if err != nil {
+               panic(err)
+       }
 
        reg := c.registry
        if reg == nil {
@@ -132,6 +169,7 @@ func (c *cache) setup() {
 func (c *cache) updateGauges() {
        c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
        c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+       c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
 var selectPDH = map[string]interface{}{
@@ -165,6 +203,80 @@ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvad
        return err
 }
 
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
+       c.setupOnce.Do(c.setup)
+       c.sessions.Remove(token)
+}
+
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+       c.setupOnce.Do(c.setup)
+       now := time.Now()
+       ent, _ := c.sessions.Get(token)
+       sess, _ := ent.(*cachedSession)
+       if sess == nil {
+               c.metrics.sessionMisses.Inc()
+               sess = &cachedSession{
+                       expire: now.Add(c.config.TTL.Duration()),
+               }
+               c.sessions.Add(token, sess)
+       } else if sess.expire.Before(now) {
+               c.metrics.sessionMisses.Inc()
+               sess.fs.Store(nil)
+       } else {
+               c.metrics.sessionHits.Inc()
+       }
+       go c.pruneSessions()
+       fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+       if fs != nil {
+               return fs, nil
+       }
+       ac, err := arvados.NewClientFromConfig(c.cluster)
+       if err != nil {
+               return nil, err
+       }
+       ac.AuthToken = token
+       arv, err := arvadosclient.New(ac)
+       if err != nil {
+               return nil, err
+       }
+       kc := keepclient.New(arv)
+       fs = ac.SiteFileSystem(kc)
+       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+       sess.fs.Store(fs)
+       return fs, nil
+}
+
+func (c *cache) pruneSessions() {
+       now := time.Now()
+       var size int64
+       for _, token := range c.sessions.Keys() {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               s := ent.(*cachedSession)
+               if s.expire.Before(now) {
+                       c.sessions.Remove(token)
+                       continue
+               }
+               fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+               if fs == nil {
+                       continue
+               }
+               size += fs.MemorySize()
+       }
+       if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+               for _, token := range c.sessions.Keys() {
+                       c.sessions.Remove(token)
+               }
+       }
+}
+
 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
        c.setupOnce.Do(c.setup)
        c.metrics.requests.Inc()
@@ -288,7 +400,7 @@ func (c *cache) pruneCollections() {
                }
        }
        for i, k := range keys {
-               if size <= c.config.MaxCollectionBytes {
+               if size <= c.config.MaxCollectionBytes/2 {
                        break
                }
                if expired[i] {
@@ -300,8 +412,8 @@ func (c *cache) pruneCollections() {
        }
 }
 
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
        for _, k := range c.collections.Keys() {
@@ -311,6 +423,15 @@ func (c *cache) collectionBytes() uint64 {
                }
                size += uint64(len(v.(*cachedCollection).collection.ManifestText))
        }
+       for _, token := range c.sessions.Keys() {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+                       size += uint64(fs.MemorySize())
+               }
+       }
        return size
 }
 
index 2d6fb78f8098a7752a2e9075f8ea84ca537c445f..4ea2fa2f6dea89af1b3a744b09a2da6d36e61169 100644 (file)
@@ -520,7 +520,8 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
        arv = h.clientPool.Get()
        if arv == nil {
-               return nil, nil, nil, nil, err
+               err = h.clientPool.Err()
+               return
        }
        release = func() { h.clientPool.Put(arv) }
        arv.ApiToken = token
@@ -548,14 +549,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
                http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
-       _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+       fs, err := h.Config.Cache.GetSession(tokens[0])
        if err != nil {
-               http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+               http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
-       defer release()
-
-       fs := client.SiteFileSystem(kc)
        fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
        f, err := fs.Open(r.URL.Path)
        if os.IsNotExist(err) {
index 647eab1653294311644bdce91faa367bd0ec1832..a62e0abb667ca5211994211e9f401c271e2e5b5b 100644 (file)
@@ -38,6 +38,7 @@ func newConfig(arvCfg *arvados.Config) *Config {
        }
        cfg.cluster = cls
        cfg.Cache.config = &cfg.cluster.Collections.WebDAVCache
+       cfg.Cache.cluster = cls
        return &cfg
 }
 
index 1c84976d2b1bf26622ba67619438f4536e6551f0..d500f1e65a37e13739529f80e10a2e52a6dcf181 100644 (file)
@@ -243,15 +243,29 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                return false
        }
 
-       _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
-       if err != nil {
-               s3ErrorResponse(w, InternalError, "Pool failed: "+h.clientPool.Err().Error(), r.URL.Path, http.StatusInternalServerError)
-               return true
+       var err error
+       var fs arvados.CustomFileSystem
+       if r.Method == http.MethodGet || r.Method == http.MethodHead {
+               // Use a single session (cached FileSystem) across
+               // multiple read requests.
+               fs, err = h.Config.Cache.GetSession(token)
+               if err != nil {
+                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+                       return true
+               }
+       } else {
+               // Create a FileSystem for this request, to avoid
+               // exposing incomplete write operations to concurrent
+               // requests.
+               _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+               if err != nil {
+                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+                       return true
+               }
+               defer release()
+               fs = client.SiteFileSystem(kc)
+               fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
        }
-       defer release()
-
-       fs := client.SiteFileSystem(kc)
-       fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 
        var objectNameGiven bool
        var bucketName string
@@ -400,6 +414,8 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
+               // Ensure a subsequent read operation will see the changes.
+               h.Config.Cache.ResetSession(token)
                w.WriteHeader(http.StatusOK)
                return true
        case r.Method == http.MethodDelete:
@@ -447,11 +463,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
+               // Ensure a subsequent read operation will see the changes.
+               h.Config.Cache.ResetSession(token)
                w.WriteHeader(http.StatusNoContent)
                return true
        default:
                s3ErrorResponse(w, InvalidRequest, "method not allowed", r.URL.Path, http.StatusMethodNotAllowed)
-
                return true
        }
 }