//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepweb
import (
+ "errors"
+ "net/http"
"sync"
"sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "github.com/hashicorp/golang-lru"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
-type cache struct {
- TTL arvados.Duration
- MaxCollectionEntries int
- MaxCollectionBytes int64
- MaxPermissionEntries int
- MaxUUIDEntries int
-
- stats cacheStats
- pdhs *lru.TwoQueueCache
- collections *lru.TwoQueueCache
- permissions *lru.TwoQueueCache
- setupOnce sync.Once
-}
+const metricsUpdateInterval = time.Second / 10
-type cacheStats struct {
- Requests uint64 `json:"Cache.Requests"`
- CollectionBytes uint64 `json:"Cache.CollectionBytes"`
- CollectionEntries int `json:"Cache.CollectionEntries"`
- CollectionHits uint64 `json:"Cache.CollectionHits"`
- PDHHits uint64 `json:"Cache.UUIDHits"`
- PermissionHits uint64 `json:"Cache.PermissionHits"`
- APICalls uint64 `json:"Cache.APICalls"`
+type cache struct {
+ cluster *arvados.Cluster
+ logger logrus.FieldLogger
+ registry *prometheus.Registry
+ metrics cacheMetrics
+ sessions *lru.TwoQueueCache
+ setupOnce sync.Once
+
+ chPruneSessions chan struct{}
}
-type cachedPDH struct {
- expire time.Time
- pdh string
+type cacheMetrics struct {
+ requests prometheus.Counter
+ collectionBytes prometheus.Gauge
+ sessionEntries prometheus.Gauge
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
}
-type cachedCollection struct {
- expire time.Time
- collection *arvados.Collection
+func (m *cacheMetrics) setup(reg *prometheus.Registry) {
+ m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "cached_session_bytes",
+ Help: "Total size of all cached sessions.",
+ })
+ reg.MustRegister(m.collectionBytes)
+ m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "active",
+ Help: "Number of active token sessions.",
+ })
+ reg.MustRegister(m.sessionEntries)
+ m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "hits",
+ Help: "Number of token session cache hits.",
+ })
+ reg.MustRegister(m.sessionHits)
+ m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "misses",
+ Help: "Number of token session cache misses.",
+ })
+ reg.MustRegister(m.sessionMisses)
}
-type cachedPermission struct {
- expire time.Time
+type cachedSession struct {
+ expire time.Time
+ fs atomic.Value
+ client *arvados.Client
+ arvadosclient *arvadosclient.ArvadosClient
+ keepclient *keepclient.KeepClient
+ user atomic.Value
}
func (c *cache) setup() {
var err error
- c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
- if err != nil {
- panic(err)
- }
- c.collections, err = lru.New2Q(c.MaxCollectionEntries)
+ c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
if err != nil {
panic(err)
}
- c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
- if err != nil {
- panic(err)
+
+ reg := c.registry
+ if reg == nil {
+ reg = prometheus.NewRegistry()
}
+ c.metrics.setup(reg)
+ go func() {
+ for range time.Tick(metricsUpdateInterval) {
+ c.updateGauges()
+ }
+ }()
+ c.chPruneSessions = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneSessions {
+ c.pruneSessions()
+ }
+ }()
+}
+
+func (c *cache) updateGauges() {
+ c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
+ c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
var selectPDH = map[string]interface{}{
"select": []string{"portable_data_hash"},
}
-func (c *cache) Stats() cacheStats {
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
c.setupOnce.Do(c.setup)
- return cacheStats{
- Requests: atomic.LoadUint64(&c.stats.Requests),
- CollectionBytes: c.collectionBytes(),
- CollectionEntries: c.collections.Len(),
- CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
- PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
- PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
- APICalls: atomic.LoadUint64(&c.stats.APICalls),
- }
+ c.sessions.Remove(token)
}
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
c.setupOnce.Do(c.setup)
-
- atomic.AddUint64(&c.stats.Requests, 1)
-
- permOK := false
- permKey := arv.ApiToken + "\000" + targetID
- if forceReload {
- } else if ent, cached := c.permissions.Get(permKey); cached {
- ent := ent.(*cachedPermission)
- if ent.expire.Before(time.Now()) {
- c.permissions.Remove(permKey)
- } else {
- permOK = true
- atomic.AddUint64(&c.stats.PermissionHits, 1)
+ now := time.Now()
+ ent, _ := c.sessions.Get(token)
+ sess, _ := ent.(*cachedSession)
+ expired := false
+ if sess == nil {
+ c.metrics.sessionMisses.Inc()
+ sess = &cachedSession{
+ expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
}
- }
-
- var pdh string
- if arvadosclient.PDHMatch(targetID) {
- pdh = targetID
- } else if ent, cached := c.pdhs.Get(targetID); cached {
- ent := ent.(*cachedPDH)
- if ent.expire.Before(time.Now()) {
- c.pdhs.Remove(targetID)
- } else {
- pdh = ent.pdh
- atomic.AddUint64(&c.stats.PDHHits, 1)
+ var err error
+ sess.client, err = arvados.NewClientFromConfig(c.cluster)
+ if err != nil {
+ return nil, nil, nil, err
}
+ sess.client.AuthToken = token
+ // A non-empty origin header tells controller to
+ // prioritize our traffic as interactive, which is
+ // true most of the time.
+ origin := c.cluster.Services.WebDAVDownload.ExternalURL
+ sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
+ sess.arvadosclient, err = arvadosclient.New(sess.client)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ sess.keepclient = keepclient.New(sess.arvadosclient)
+ c.sessions.Add(token, sess)
+ } else if sess.expire.Before(now) {
+ c.metrics.sessionMisses.Inc()
+ expired = true
+ } else {
+ c.metrics.sessionHits.Inc()
+ }
+ select {
+ case c.chPruneSessions <- struct{}{}:
+ default:
}
- var collection *arvados.Collection
- if pdh != "" {
- collection = c.lookupCollection(pdh)
+ fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+ if fs == nil || expired {
+ fs = sess.client.SiteFileSystem(sess.keepclient)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
}
- if collection != nil && permOK {
- return collection, nil
- } else if collection != nil {
- // Ask API for current PDH for this targetID. Most
- // likely, the cached PDH is still correct; if so,
- // _and_ the current token has permission, we can
- // use our cached manifest.
- atomic.AddUint64(&c.stats.APICalls, 1)
- var current arvados.Collection
- err := arv.Get("collections", targetID, selectPDH, ¤t)
- if err != nil {
- return nil, err
- }
- if current.PortableDataHash == pdh {
- exp := time.Now().Add(time.Duration(c.TTL))
- c.permissions.Add(permKey, &cachedPermission{
- expire: exp,
- })
- if pdh != targetID {
- c.pdhs.Add(targetID, &cachedPDH{
- expire: exp,
- pdh: pdh,
- })
- }
- return collection, err
- } else {
- // PDH changed, but now we know we have
- // permission -- and maybe we already have the
- // new PDH in the cache.
- if coll := c.lookupCollection(current.PortableDataHash); coll != nil {
- return coll, nil
- }
+ user, _ := sess.user.Load().(*arvados.User)
+ if user == nil || expired {
+ user = new(arvados.User)
+ err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
+ if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
+ // token is OK, but "get user id" api is out
+ // of scope -- return nil, signifying unknown
+ // user
+ } else if err != nil {
+ return nil, nil, nil, err
}
+ sess.user.Store(user)
}
- // Collection manifest is not cached.
- atomic.AddUint64(&c.stats.APICalls, 1)
- err := arv.Get("collections", targetID, nil, &collection)
- if err != nil {
- return nil, err
- }
- exp := time.Now().Add(time.Duration(c.TTL))
- c.permissions.Add(permKey, &cachedPermission{
- expire: exp,
- })
- c.pdhs.Add(targetID, &cachedPDH{
- expire: exp,
- pdh: collection.PortableDataHash,
- })
- // Disabled, see #11945
- // c.collections.Add(collection.PortableDataHash, &cachedCollection{
- // expire: exp,
- // collection: collection,
- // })
- // if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
- // go c.pruneCollections()
- // }
- return collection, nil
+ return fs, sess, user, nil
}
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
- var size int64
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
+func (c *cache) pruneSessions() {
now := time.Now()
- keys := c.collections.Keys()
- entsize := make([]int, len(keys))
- expired := make([]bool, len(keys))
- for i, k := range keys {
- v, ok := c.collections.Peek(k)
+ keys := c.sessions.Keys()
+ sizes := make([]int64, len(keys))
+ var size int64
+ for i, token := range keys {
+ ent, ok := c.sessions.Peek(token)
if !ok {
continue
}
- ent := v.(*cachedCollection)
- n := len(ent.collection.ManifestText)
- size += int64(n)
- entsize[i] = n
- expired[i] = ent.expire.Before(now)
- }
- for i, k := range keys {
- if expired[i] {
- c.collections.Remove(k)
- size -= int64(entsize[i])
+ s := ent.(*cachedSession)
+ if s.expire.Before(now) {
+ c.sessions.Remove(token)
+ continue
}
- }
- for i, k := range keys {
- if size <= c.MaxCollectionBytes {
- break
+ if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+ sizes[i] = fs.MemorySize()
+ size += sizes[i]
}
- if expired[i] {
- // already removed this entry in the previous loop
- continue
+ }
+ // Remove tokens until reaching size limit, starting with the
+ // least frequently used entries (which Keys() returns last).
+ for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
+ if sizes[i] > 0 {
+ c.sessions.Remove(keys[i])
+ size -= sizes[i]
}
- c.collections.Remove(k)
- size -= int64(entsize[i])
}
}
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
- for _, k := range c.collections.Keys() {
- v, ok := c.collections.Peek(k)
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
if !ok {
continue
}
- size += uint64(len(v.(*cachedCollection).collection.ManifestText))
- }
- return size
-}
-
-func (c *cache) lookupCollection(pdh string) *arvados.Collection {
- if pdh == "" {
- return nil
- } else if ent, cached := c.collections.Get(pdh); !cached {
- return nil
- } else {
- ent := ent.(*cachedCollection)
- if ent.expire.Before(time.Now()) {
- c.collections.Remove(pdh)
- return nil
- } else {
- atomic.AddUint64(&c.stats.CollectionHits, 1)
- return ent.collection
+ if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+ size += uint64(fs.MemorySize())
}
}
+ return size
}