From: Tom Clegg Date: Mon, 22 Feb 2021 16:10:10 +0000 (-0500) Subject: 16745: Keep a SiteFileSystem alive for multiple read requests. X-Git-Tag: 2.2.0~114^2~6 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/820fc945c069d237e515dcc1608a5661dbf7700e 16745: Keep a SiteFileSystem alive for multiple read requests. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 68e518732d..bc87b2cc70 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -523,21 +523,30 @@ Clusters: TrustAllContent: false # Cache parameters for WebDAV content serving: - # * TTL: Maximum time to cache manifests and permission checks. - # * UUIDTTL: Maximum time to cache collection state. - # * MaxBlockEntries: Maximum number of block cache entries. - # * MaxCollectionEntries: Maximum number of collection cache entries. - # * MaxCollectionBytes: Approximate memory limit for collection cache. - # * MaxPermissionEntries: Maximum number of permission cache entries. - # * MaxUUIDEntries: Maximum number of UUID cache entries. WebDAVCache: + # Time to cache manifests, permission checks, and sessions. TTL: 300s + + # Time to cache collection state. UUIDTTL: 5s - MaxBlockEntries: 4 + + # Block cache entries. Each block consumes up to 64 MiB RAM. + MaxBlockEntries: 4 + + # Collection cache entries. MaxCollectionEntries: 1000 - MaxCollectionBytes: 100000000 + + # Approximate memory limit (in bytes) for collection cache. + MaxCollectionBytes: 100000000 + + # Permission cache entries. MaxPermissionEntries: 1000 - MaxUUIDEntries: 1000 + + # UUID cache entries. + MaxUUIDEntries: 1000 + + # Persistent sessions. + MaxSessions: 100 Login: # One of the following mechanisms (SSO, Google, PAM, LDAP, or diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index 8ef787771e..f0ae11aab0 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -529,21 +529,30 @@ Clusters: TrustAllContent: false # Cache parameters for WebDAV content serving: - # * TTL: Maximum time to cache manifests and permission checks. - # * UUIDTTL: Maximum time to cache collection state. - # * MaxBlockEntries: Maximum number of block cache entries. - # * MaxCollectionEntries: Maximum number of collection cache entries. - # * MaxCollectionBytes: Approximate memory limit for collection cache. - # * MaxPermissionEntries: Maximum number of permission cache entries. - # * MaxUUIDEntries: Maximum number of UUID cache entries. WebDAVCache: + # Time to cache manifests, permission checks, and sessions. TTL: 300s + + # Time to cache collection state. UUIDTTL: 5s - MaxBlockEntries: 4 + + # Block cache entries. Each block consumes up to 64 MiB RAM. + MaxBlockEntries: 4 + + # Collection cache entries. MaxCollectionEntries: 1000 - MaxCollectionBytes: 100000000 + + # Approximate memory limit (in bytes) for collection cache. + MaxCollectionBytes: 100000000 + + # Permission cache entries. MaxPermissionEntries: 1000 - MaxUUIDEntries: 1000 + + # UUID cache entries. + MaxUUIDEntries: 1000 + + # Persistent sessions. + MaxSessions: 100 Login: # One of the following mechanisms (SSO, Google, PAM, LDAP, or diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 4a56c93021..074914744f 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -65,6 +65,7 @@ type WebDAVCacheConfig struct { MaxCollectionBytes int64 MaxPermissionEntries int MaxUUIDEntries int + MaxSessions int } type Cluster struct { diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go index aa75fee7c4..2478641df5 100644 --- a/sdk/go/arvados/fs_base.go +++ b/sdk/go/arvados/fs_base.go @@ -106,6 +106,9 @@ type FileSystem interface { // path is "", flush all dirs/streams; otherwise, flush only // the specified dir/stream. Flush(path string, shortBlocks bool) error + + // Estimate current memory usage. + MemorySize() int64 } type inode interface { @@ -156,6 +159,7 @@ type inode interface { sync.Locker RLock() RUnlock() + MemorySize() int64 } type fileinfo struct { @@ -229,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, return nil, ErrNotADirectory } +func (*nullnode) MemorySize() int64 { + // Types that embed nullnode should report their own size, but + // if they don't, we at least report a non-zero size to ensure + // a large tree doesn't get reported as 0 bytes. + return 64 +} + type treenode struct { fs FileSystem parent inode @@ -319,6 +330,15 @@ func (n *treenode) Sync() error { return nil } +func (n *treenode) MemorySize() (size int64) { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + size += inode.MemorySize() + } + return +} + type fileSystem struct { root inode fsBackend @@ -607,6 +627,10 @@ func (fs *fileSystem) Flush(string, bool) error { return ErrInvalidOperation } +func (fs *fileSystem) MemorySize() int64 { + return fs.root.MemorySize() +} + // rlookup (recursive lookup) returns the inode for the file/directory // with the given name (which may contain "/" separators). If no such // file/directory exists, the returned node is nil. diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 1de558a1bd..0233826a72 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -38,9 +38,6 @@ type CollectionFileSystem interface { // Total data bytes in all files. Size() int64 - - // Memory consumed by buffered file data. - memorySize() int64 } type collectionFileSystem struct { @@ -232,10 +229,10 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error { return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks}) } -func (fs *collectionFileSystem) memorySize() int64 { +func (fs *collectionFileSystem) MemorySize() int64 { fs.fileSystem.root.Lock() defer fs.fileSystem.root.Unlock() - return fs.fileSystem.root.(*dirnode).memorySize() + return fs.fileSystem.root.(*dirnode).MemorySize() } func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) { @@ -879,14 +876,14 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er } // caller must have write lock. -func (dn *dirnode) memorySize() (size int64) { +func (dn *dirnode) MemorySize() (size int64) { for _, name := range dn.sortedNames() { node := dn.inodes[name] node.Lock() defer node.Unlock() switch node := node.(type) { case *dirnode: - size += node.memorySize() + size += node.MemorySize() case *filenode: for _, seg := range node.segments { switch seg := seg.(type) { diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go index 59a6a6ba82..05c8ea61a1 100644 --- a/sdk/go/arvados/fs_collection_test.go +++ b/sdk/go/arvados/fs_collection_test.go @@ -1153,9 +1153,9 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) { fs.Flush("", true) } - size := fs.memorySize() + size := fs.MemorySize() if !c.Check(size <= 1<<24, check.Equals, true) { - c.Logf("at dir%d fs.memorySize()=%d", i, size) + c.Logf("at dir%d fs.MemorySize()=%d", i, size) return } } @@ -1188,13 +1188,13 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) { c.Assert(err, check.IsNil) } } - c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20)) + c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)) c.Check(flushed, check.Equals, int64(0)) waitForFlush := func(expectUnflushed, expectFlushed int64) { - for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { + for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { } - c.Check(fs.memorySize(), check.Equals, expectUnflushed) + c.Check(fs.MemorySize(), check.Equals, expectUnflushed) c.Check(flushed, check.Equals, expectFlushed) } diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go index 254b90c812..bb6c7a2626 100644 --- a/sdk/go/arvados/fs_deferred.go +++ b/sdk/go/arvados/fs_deferred.go @@ -112,3 +112,4 @@ func (dn *deferrednode) RLock() { dn.realinode().RLock( func (dn *deferrednode) RUnlock() { dn.realinode().RUnlock() } func (dn *deferrednode) FS() FileSystem { return dn.currentinode().FS() } func (dn *deferrednode) Parent() inode { return dn.currentinode().Parent() } +func (dn *deferrednode) MemorySize() int64 { return dn.currentinode().MemorySize() } diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index eeb78ad905..71e9953304 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -6,23 +6,27 @@ package main import ( "sync" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" - "github.com/hashicorp/golang-lru" + "git.arvados.org/arvados.git/sdk/go/keepclient" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" ) const metricsUpdateInterval = time.Second / 10 type cache struct { - config *arvados.WebDAVCacheConfig + cluster *arvados.Cluster + config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead registry *prometheus.Registry metrics cacheMetrics pdhs *lru.TwoQueueCache collections *lru.TwoQueueCache permissions *lru.TwoQueueCache + sessions *lru.TwoQueueCache setupOnce sync.Once } @@ -30,9 +34,12 @@ type cacheMetrics struct { requests prometheus.Counter collectionBytes prometheus.Gauge collectionEntries prometheus.Gauge + sessionEntries prometheus.Gauge collectionHits prometheus.Counter pdhHits prometheus.Counter permissionHits prometheus.Counter + sessionHits prometheus.Counter + sessionMisses prometheus.Counter apiCalls prometheus.Counter } @@ -86,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { Help: "Number of manifests in cache.", }) reg.MustRegister(m.collectionEntries) + m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "active", + Help: "Number of active token sessions.", + }) + reg.MustRegister(m.sessionEntries) + m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "hits", + Help: "Number of token session cache hits.", + }) + reg.MustRegister(m.sessionHits) + m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "misses", + Help: "Number of token session cache misses.", + }) + reg.MustRegister(m.sessionMisses) } type cachedPDH struct { @@ -102,6 +130,11 @@ type cachedPermission struct { expire time.Time } +type cachedSession struct { + expire time.Time + fs atomic.Value +} + func (c *cache) setup() { var err error c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries) @@ -116,6 +149,10 @@ func (c *cache) setup() { if err != nil { panic(err) } + c.sessions, err = lru.New2Q(c.config.MaxSessions) + if err != nil { + panic(err) + } reg := c.registry if reg == nil { @@ -132,6 +169,7 @@ func (c *cache) setup() { func (c *cache) updateGauges() { c.metrics.collectionBytes.Set(float64(c.collectionBytes())) c.metrics.collectionEntries.Set(float64(c.collections.Len())) + c.metrics.sessionEntries.Set(float64(c.sessions.Len())) } var selectPDH = map[string]interface{}{ @@ -165,6 +203,80 @@ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvad return err } +// ResetSession unloads any potentially stale state. Should be called +// after write operations, so subsequent reads don't return stale +// data. +func (c *cache) ResetSession(token string) { + c.setupOnce.Do(c.setup) + c.sessions.Remove(token) +} + +// Get a long-lived CustomFileSystem suitable for doing a read operation +// with the given token. +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) { + c.setupOnce.Do(c.setup) + now := time.Now() + ent, _ := c.sessions.Get(token) + sess, _ := ent.(*cachedSession) + if sess == nil { + c.metrics.sessionMisses.Inc() + sess = &cachedSession{ + expire: now.Add(c.config.TTL.Duration()), + } + c.sessions.Add(token, sess) + } else if sess.expire.Before(now) { + c.metrics.sessionMisses.Inc() + sess.fs.Store(nil) + } else { + c.metrics.sessionHits.Inc() + } + go c.pruneSessions() + fs, _ := sess.fs.Load().(arvados.CustomFileSystem) + if fs != nil { + return fs, nil + } + ac, err := arvados.NewClientFromConfig(c.cluster) + if err != nil { + return nil, err + } + ac.AuthToken = token + arv, err := arvadosclient.New(ac) + if err != nil { + return nil, err + } + kc := keepclient.New(arv) + fs = ac.SiteFileSystem(kc) + fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.fs.Store(fs) + return fs, nil +} + +func (c *cache) pruneSessions() { + now := time.Now() + var size int64 + for _, token := range c.sessions.Keys() { + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + s := ent.(*cachedSession) + if s.expire.Before(now) { + c.sessions.Remove(token) + continue + } + fs, _ := s.fs.Load().(arvados.CustomFileSystem) + if fs == nil { + continue + } + size += fs.MemorySize() + } + if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { + for _, token := range c.sessions.Keys() { + c.sessions.Remove(token) + } + } +} + func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { c.setupOnce.Do(c.setup) c.metrics.requests.Inc() @@ -288,7 +400,7 @@ func (c *cache) pruneCollections() { } } for i, k := range keys { - if size <= c.config.MaxCollectionBytes { + if size <= c.config.MaxCollectionBytes/2 { break } if expired[i] { @@ -300,8 +412,8 @@ func (c *cache) pruneCollections() { } } -// collectionBytes returns the approximate memory size of the -// collection cache. +// collectionBytes returns the approximate combined memory size of the +// collection cache and session filesystem cache. func (c *cache) collectionBytes() uint64 { var size uint64 for _, k := range c.collections.Keys() { @@ -311,6 +423,15 @@ func (c *cache) collectionBytes() uint64 { } size += uint64(len(v.(*cachedCollection).collection.ManifestText)) } + for _, token := range c.sessions.Keys() { + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok { + size += uint64(fs.MemorySize()) + } + } return size } diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go index 2d6fb78f80..4ea2fa2f6d 100644 --- a/services/keep-web/handler.go +++ b/services/keep-web/handler.go @@ -520,7 +520,8 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) { arv = h.clientPool.Get() if arv == nil { - return nil, nil, nil, nil, err + err = h.clientPool.Err() + return } release = func() { h.clientPool.Put(arv) } arv.ApiToken = token @@ -548,14 +549,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed) return } - _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0]) + fs, err := h.Config.Cache.GetSession(tokens[0]) if err != nil { - http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } - defer release() - - fs := client.SiteFileSystem(kc) fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution) f, err := fs.Open(r.URL.Path) if os.IsNotExist(err) { diff --git a/services/keep-web/main.go b/services/keep-web/main.go index 647eab1653..a62e0abb66 100644 --- a/services/keep-web/main.go +++ b/services/keep-web/main.go @@ -38,6 +38,7 @@ func newConfig(arvCfg *arvados.Config) *Config { } cfg.cluster = cls cfg.Cache.config = &cfg.cluster.Collections.WebDAVCache + cfg.Cache.cluster = cls return &cfg } diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go index 1c84976d2b..d500f1e65a 100644 --- a/services/keep-web/s3.go +++ b/services/keep-web/s3.go @@ -243,15 +243,29 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return false } - _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token) - if err != nil { - s3ErrorResponse(w, InternalError, "Pool failed: "+h.clientPool.Err().Error(), r.URL.Path, http.StatusInternalServerError) - return true + var err error + var fs arvados.CustomFileSystem + if r.Method == http.MethodGet || r.Method == http.MethodHead { + // Use a single session (cached FileSystem) across + // multiple read requests. + fs, err = h.Config.Cache.GetSession(token) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) + return true + } + } else { + // Create a FileSystem for this request, to avoid + // exposing incomplete write operations to concurrent + // requests. + _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) + return true + } + defer release() + fs = client.SiteFileSystem(kc) + fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution) } - defer release() - - fs := client.SiteFileSystem(kc) - fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution) var objectNameGiven bool var bucketName string @@ -400,6 +414,8 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) return true } + // Ensure a subsequent read operation will see the changes. + h.Config.Cache.ResetSession(token) w.WriteHeader(http.StatusOK) return true case r.Method == http.MethodDelete: @@ -447,11 +463,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) return true } + // Ensure a subsequent read operation will see the changes. + h.Config.Cache.ResetSession(token) w.WriteHeader(http.StatusNoContent) return true default: s3ErrorResponse(w, InvalidRequest, "method not allowed", r.URL.Path, http.StatusMethodNotAllowed) - return true } }