20559: Refactor session sharing.
authorTom Clegg <tom@curii.com>
Thu, 29 Jun 2023 15:57:29 +0000 (11:57 -0400)
committerTom Clegg <tom@curii.com>
Thu, 29 Jun 2023 18:16:22 +0000 (14:16 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-web/cache.go
services/keep-web/handler.go
services/keep-web/handler_test.go

index c77a1b4bb6dca0390b79793225f713459db76ce3..29b7f2b0b39b9c911e81434c4365c9316f12b6f2 100644 (file)
@@ -8,7 +8,6 @@ import (
        "errors"
        "net/http"
        "sync"
-       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -28,6 +27,7 @@ type cache struct {
        metrics   cacheMetrics
        sessions  *lru.TwoQueueCache
        setupOnce sync.Once
+       mtx       sync.Mutex
 
        chPruneSessions chan struct{}
 }
@@ -72,12 +72,30 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
 }
 
 type cachedSession struct {
+       cache         *cache
        expire        time.Time
-       fs            atomic.Value
        client        *arvados.Client
        arvadosclient *arvadosclient.ArvadosClient
        keepclient    *keepclient.KeepClient
-       user          atomic.Value
+
+       // mtx is RLocked while session is not safe to evict from cache
+       mtx sync.RWMutex
+       // refresh is locked while reading or writing the following fields
+       refresh    sync.Mutex
+       fs         arvados.CustomFileSystem
+       user       arvados.User
+       userLoaded bool
+       // inuse is RLocked while session is in use by a caller
+       inuse sync.RWMutex
+}
+
+func (sess *cachedSession) Release() {
+       sess.inuse.RUnlock()
+       sess.mtx.RUnlock()
+       select {
+       case sess.cache.chPruneSessions <- struct{}{}:
+       default:
+       }
 }
 
 func (c *cache) setup() {
@@ -114,98 +132,112 @@ var selectPDH = map[string]interface{}{
        "select": []string{"portable_data_hash"},
 }
 
-// ResetSession unloads any potentially stale state. Should be called
-// after write operations, so subsequent reads don't return stale
-// data.
-func (c *cache) ResetSession(token string) {
-       c.setupOnce.Do(c.setup)
-       c.sessions.Remove(token)
-}
-
-// Get a long-lived CustomFileSystem suitable for doing a read operation
-// with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
+func (c *cache) checkout(token string) (*cachedSession, error) {
        c.setupOnce.Do(c.setup)
-       now := time.Now()
+       c.mtx.Lock()
+       defer c.mtx.Unlock()
        ent, _ := c.sessions.Get(token)
        sess, _ := ent.(*cachedSession)
-       expired := false
        if sess == nil {
-               c.metrics.sessionMisses.Inc()
-               sess = &cachedSession{
-                       expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
-               }
-               var err error
-               sess.client, err = arvados.NewClientFromConfig(c.cluster)
+               client, err := arvados.NewClientFromConfig(c.cluster)
                if err != nil {
-                       return nil, nil, nil, err
+                       return nil, err
                }
-               sess.client.AuthToken = token
-               sess.client.Timeout = time.Minute
+               client.AuthToken = token
+               client.Timeout = time.Minute
                // A non-empty origin header tells controller to
                // prioritize our traffic as interactive, which is
                // true most of the time.
                origin := c.cluster.Services.WebDAVDownload.ExternalURL
-               sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
-               sess.arvadosclient, err = arvadosclient.New(sess.client)
+               client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
+               arvadosclient, err := arvadosclient.New(client)
                if err != nil {
-                       return nil, nil, nil, err
+                       return nil, err
+               }
+               sess = &cachedSession{
+                       cache:         c,
+                       client:        client,
+                       arvadosclient: arvadosclient,
+                       keepclient:    keepclient.New(arvadosclient),
                }
-               sess.keepclient = keepclient.New(sess.arvadosclient)
                c.sessions.Add(token, sess)
-       } else if sess.expire.Before(now) {
-               c.metrics.sessionMisses.Inc()
-               expired = true
-       } else {
-               c.metrics.sessionHits.Inc()
-       }
-       select {
-       case c.chPruneSessions <- struct{}{}:
-       default:
        }
+       sess.mtx.RLock()
+       return sess, nil
+}
 
-       fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-       if fs == nil || expired {
-               fs = sess.client.SiteFileSystem(sess.keepclient)
-               fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
-               sess.fs.Store(fs)
+// Get a long-lived CustomFileSystem suitable for doing a read or
+// write operation with the given token.
+//
+// If the returned error is nil, the caller must call Release() on the
+// returned session when finished using it.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
+       sess, err := c.checkout(token)
+       if err != nil {
+               return nil, nil, nil, err
        }
+       sess.refresh.Lock()
+       defer sess.refresh.Unlock()
+       now := time.Now()
+       refresh := sess.expire.Before(now)
+       if sess.fs == nil || !sess.userLoaded || refresh {
+               // Wait for all active users to finish (otherwise they
+               // might make changes to an old fs after we start
+               // using the new fs).
+               sess.inuse.Lock()
+               if !sess.userLoaded || refresh {
+                       err := sess.client.RequestAndDecode(&sess.user, "GET", "/arvados/v1/users/current", nil, nil)
+                       if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
+                               // token is OK, but "get user id" api is out
+                               // of scope -- use existing/expired info if
+                               // any, or leave empty for unknown user
+                       } else if err != nil {
+                               sess.inuse.Unlock()
+                               sess.mtx.RUnlock()
+                               return nil, nil, nil, err
+                       }
+                       sess.userLoaded = true
+               }
 
-       user, _ := sess.user.Load().(*arvados.User)
-       if user == nil || expired {
-               user = new(arvados.User)
-               err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
-               if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
-                       // token is OK, but "get user id" api is out
-                       // of scope -- return nil, signifying unknown
-                       // user
-               } else if err != nil {
-                       return nil, nil, nil, err
+               if sess.fs == nil || refresh {
+                       sess.fs = sess.client.SiteFileSystem(sess.keepclient)
+                       sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+                       sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
+                       c.metrics.sessionMisses.Inc()
+               } else {
+                       c.metrics.sessionHits.Inc()
                }
-               sess.user.Store(user)
+               sess.inuse.Unlock()
+       } else {
+               c.metrics.sessionHits.Inc()
        }
-
-       return fs, sess, user, nil
+       sess.inuse.RLock()
+       return sess.fs, sess, &sess.user, nil
 }
 
-// Remove all expired session cache entries, then remove more entries
-// until approximate remaining size <= maxsize/2
+// Remove all expired idle session cache entries, and remove in-memory
+// filesystems until approximate remaining size <= maxsize/2
 func (c *cache) pruneSessions() {
        now := time.Now()
        keys := c.sessions.Keys()
        sizes := make([]int64, len(keys))
+       prune := []string(nil)
        var size int64
        for i, token := range keys {
+               token := token.(string)
                ent, ok := c.sessions.Peek(token)
                if !ok {
                        continue
                }
-               s := ent.(*cachedSession)
-               if s.expire.Before(now) {
-                       c.sessions.Remove(token)
-                       continue
+               sess := ent.(*cachedSession)
+               sess.refresh.Lock()
+               expired := sess.expire.Before(now)
+               fs := sess.fs
+               sess.refresh.Unlock()
+               if expired {
+                       prune = append(prune, token)
                }
-               if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+               if fs != nil {
                        sizes[i] = fs.MemorySize()
                        size += sizes[i]
                }
@@ -214,10 +246,46 @@ func (c *cache) pruneSessions() {
        // least frequently used entries (which Keys() returns last).
        for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
                if sizes[i] > 0 {
-                       c.sessions.Remove(keys[i])
+                       prune = append(prune, keys[i].(string))
                        size -= sizes[i]
                }
        }
+
+       c.mtx.Lock()
+       defer c.mtx.Unlock()
+       for _, token := range prune {
+               ent, ok := c.sessions.Peek(token)
+               if !ok {
+                       continue
+               }
+               sess := ent.(*cachedSession)
+               if sess.mtx.TryLock() {
+                       c.sessions.Remove(token)
+                       continue
+               }
+               // We can't remove a session that's been checked out
+               // -- that would allow another session to be created
+               // for the same token using a different in-memory
+               // filesystem. Instead, we wait for active requests to
+               // finish and then "unload" it. After this, either the
+               // next GetSession will reload fs/user, or a
+               // subsequent pruneSessions will remove the session.
+               go func() {
+                       // Ensure nobody is in GetSession
+                       sess.refresh.Lock()
+                       // Wait for current usage to finish
+                       sess.inuse.Lock()
+                       // Release memory
+                       sess.fs = nil
+                       if sess.expire.Before(now) {
+                               // Mark user data as stale
+                               sess.userLoaded = false
+                       }
+                       sess.inuse.Unlock()
+                       sess.refresh.Unlock()
+                       // Next GetSession will make a new fs
+               }()
+       }
 }
 
 // collectionBytes returns the approximate combined memory size of the
@@ -229,7 +297,11 @@ func (c *cache) collectionBytes() uint64 {
                if !ok {
                        continue
                }
-               if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+               sess := ent.(*cachedSession)
+               sess.refresh.Lock()
+               fs := sess.fs
+               sess.refresh.Unlock()
+               if fs != nil {
                        size += uint64(fs.MemorySize())
                }
        }
index 4625602fc2c440bd52aac425fc0e9fdaed3fcea8..3af326a1ad451483da29bb6398624b508e0d1fe2 100644 (file)
@@ -411,16 +411,20 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        // collection id is outside scope of supplied
                        // token
                        tokenScopeProblem = true
+                       sess.Release()
                        continue
                } else if os.IsNotExist(err) {
                        // collection does not exist or is not
                        // readable using this token
+                       sess.Release()
                        continue
                } else if err != nil {
                        http.Error(w, err.Error(), http.StatusInternalServerError)
+                       sess.Release()
                        return
                }
                defer f.Close()
+               defer sess.Release()
 
                collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
                break
index 3b957c5a0da565b61d96a6ba13bbd2c3147ae897..4a76276392ca5b9772d922287620ac29c55774c4 100644 (file)
@@ -1627,6 +1627,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestConcurrentWrites(c *check.C) {
+       s.handler.Cluster.Collections.WebDAVCache.TTL = arvados.Duration(time.Second * 2)
        lockTidyInterval = time.Second
        client := arvados.NewClientFromEnv()
        client.AuthToken = arvadostest.ActiveTokenV2