"sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "github.com/hashicorp/golang-lru"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
const metricsUpdateInterval = time.Second / 10
type cache struct {
- TTL arvados.Duration
- UUIDTTL arvados.Duration
- MaxCollectionEntries int
- MaxCollectionBytes int64
- MaxPermissionEntries int
- MaxUUIDEntries int
-
+ cluster *arvados.Cluster
+ config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
+ logger logrus.FieldLogger
registry *prometheus.Registry
- stats cacheStats
metrics cacheMetrics
pdhs *lru.TwoQueueCache
collections *lru.TwoQueueCache
- permissions *lru.TwoQueueCache
+ sessions *lru.TwoQueueCache
setupOnce sync.Once
-}
-// cacheStats is EOL - add new metrics to cacheMetrics instead
-type cacheStats struct {
- Requests uint64 `json:"Cache.Requests"`
- CollectionBytes uint64 `json:"Cache.CollectionBytes"`
- CollectionEntries int `json:"Cache.CollectionEntries"`
- CollectionHits uint64 `json:"Cache.CollectionHits"`
- PDHHits uint64 `json:"Cache.UUIDHits"`
- PermissionHits uint64 `json:"Cache.PermissionHits"`
- APICalls uint64 `json:"Cache.APICalls"`
+ chPruneSessions chan struct{}
+ chPruneCollections chan struct{}
}
type cacheMetrics struct {
requests prometheus.Counter
collectionBytes prometheus.Gauge
collectionEntries prometheus.Gauge
+ sessionEntries prometheus.Gauge
collectionHits prometheus.Counter
pdhHits prometheus.Counter
- permissionHits prometheus.Counter
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
apiCalls prometheus.Counter
}
Help: "Number of uuid-to-pdh cache hits.",
})
reg.MustRegister(m.pdhHits)
- m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "permission_hits",
- Help: "Number of targetID-to-permission cache hits.",
- })
- reg.MustRegister(m.permissionHits)
m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "keepweb_collectioncache",
reg.MustRegister(m.apiCalls)
m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "cached_manifest_bytes",
- Help: "Total size of all manifests in cache.",
+ Subsystem: "keepweb_sessions",
+ Name: "cached_collection_bytes",
+ Help: "Total size of all cached manifests and sessions.",
})
reg.MustRegister(m.collectionBytes)
m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "Number of manifests in cache.",
})
reg.MustRegister(m.collectionEntries)
+ m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "active",
+ Help: "Number of active token sessions.",
+ })
+ reg.MustRegister(m.sessionEntries)
+ m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "hits",
+ Help: "Number of token session cache hits.",
+ })
+ reg.MustRegister(m.sessionHits)
+ m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "misses",
+ Help: "Number of token session cache misses.",
+ })
+ reg.MustRegister(m.sessionMisses)
}
type cachedPDH struct {
- expire time.Time
- pdh string
+ expire time.Time
+ refresh time.Time
+ pdh string
}
type cachedCollection struct {
expire time.Time
}
+type cachedSession struct {
+ expire time.Time
+ fs atomic.Value
+ client *arvados.Client
+ arvadosclient *arvadosclient.ArvadosClient
+ keepclient *keepclient.KeepClient
+ user atomic.Value
+}
+
func (c *cache) setup() {
var err error
- c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
+ c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
if err != nil {
panic(err)
}
- c.collections, err = lru.New2Q(c.MaxCollectionEntries)
+ c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
if err != nil {
panic(err)
}
- c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
+ c.sessions, err = lru.New2Q(c.config.MaxSessions)
if err != nil {
panic(err)
}
c.updateGauges()
}
}()
+ c.chPruneCollections = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneCollections {
+ c.pruneCollections()
+ }
+ }()
+ c.chPruneSessions = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneSessions {
+ c.pruneSessions()
+ }
+ }()
}
func (c *cache) updateGauges() {
c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+ c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
var selectPDH = map[string]interface{}{
"select": []string{"portable_data_hash"},
}
-func (c *cache) Stats() cacheStats {
- c.setupOnce.Do(c.setup)
- return cacheStats{
- Requests: atomic.LoadUint64(&c.stats.Requests),
- CollectionBytes: c.collectionBytes(),
- CollectionEntries: c.collections.Len(),
- CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
- PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
- PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
- APICalls: atomic.LoadUint64(&c.stats.APICalls),
- }
-}
-
// Update saves a modified version (fs) to an existing collection
// (coll) and, if successful, updates the relevant cache entries so
// subsequent calls to Get() reflect the modifications.
func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
c.setupOnce.Do(c.setup)
- if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
+ m, err := fs.MarshalManifest(".")
+ if err != nil || m == coll.ManifestText {
return err
- } else {
- coll.ManifestText = m
}
+ coll.ManifestText = m
var updated arvados.Collection
- defer c.pdhs.Remove(coll.UUID)
- err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
- if err == nil {
- c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
- expire: time.Now().Add(time.Duration(c.TTL)),
- collection: &updated,
- })
+ err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": coll.ManifestText,
+ },
+ })
+ if err != nil {
+ c.pdhs.Remove(coll.UUID)
+ return err
}
- return err
+ c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ collection: &updated,
+ })
+ c.pdhs.Add(coll.UUID, &cachedPDH{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
+ pdh: updated.PortableDataHash,
+ })
+ return nil
}
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
c.setupOnce.Do(c.setup)
+ c.sessions.Remove(token)
+}
- atomic.AddUint64(&c.stats.Requests, 1)
- c.metrics.requests.Inc()
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+ c.setupOnce.Do(c.setup)
+ now := time.Now()
+ ent, _ := c.sessions.Get(token)
+ sess, _ := ent.(*cachedSession)
+ expired := false
+ if sess == nil {
+ c.metrics.sessionMisses.Inc()
+ sess = &cachedSession{
+ expire: now.Add(c.config.TTL.Duration()),
+ }
+ var err error
+ sess.client, err = arvados.NewClientFromConfig(c.cluster)
+ if err != nil {
+ return nil, nil, err
+ }
+ sess.client.AuthToken = token
+ sess.arvadosclient, err = arvadosclient.New(sess.client)
+ if err != nil {
+ return nil, nil, err
+ }
+ sess.keepclient = keepclient.New(sess.arvadosclient)
+ c.sessions.Add(token, sess)
+ } else if sess.expire.Before(now) {
+ c.metrics.sessionMisses.Inc()
+ expired = true
+ } else {
+ c.metrics.sessionHits.Inc()
+ }
+ select {
+ case c.chPruneSessions <- struct{}{}:
+ default:
+ }
+ fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+ if fs != nil && !expired {
+ return fs, sess, nil
+ }
+ fs = sess.client.SiteFileSystem(sess.keepclient)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
+ return fs, sess, nil
+}
- permOK := false
- permKey := arv.ApiToken + "\000" + targetID
- if forceReload {
- } else if ent, cached := c.permissions.Get(permKey); cached {
- ent := ent.(*cachedPermission)
- if ent.expire.Before(time.Now()) {
- c.permissions.Remove(permKey)
- } else {
- permOK = true
- atomic.AddUint64(&c.stats.PermissionHits, 1)
- c.metrics.permissionHits.Inc()
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
+func (c *cache) pruneSessions() {
+ now := time.Now()
+ var size int64
+ keys := c.sessions.Keys()
+ for _, token := range keys {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
+ if s.expire.Before(now) {
+ c.sessions.Remove(token)
+ continue
+ }
+ if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+ size += fs.MemorySize()
}
}
+ // Remove tokens until reaching size limit, starting with the
+ // least frequently used entries (which Keys() returns last).
+ for i := len(keys) - 1; i >= 0; i-- {
+ token := keys[i]
+ if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ break
+ }
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
+ fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+ if fs == nil {
+ continue
+ }
+ c.sessions.Remove(token)
+ size -= fs.MemorySize()
+ }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+ c.setupOnce.Do(c.setup)
+ c.metrics.requests.Inc()
+ var pdhRefresh bool
var pdh string
if arvadosclient.PDHMatch(targetID) {
pdh = targetID
c.pdhs.Remove(targetID)
} else {
pdh = ent.pdh
- atomic.AddUint64(&c.stats.PDHHits, 1)
+ pdhRefresh = forceReload || time.Now().After(ent.refresh)
c.metrics.pdhHits.Inc()
}
}
- var collection *arvados.Collection
- if pdh != "" {
- collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
- }
-
- if collection != nil && permOK {
- return collection, nil
- } else if collection != nil {
- // Ask API for current PDH for this targetID. Most
- // likely, the cached PDH is still correct; if so,
- // _and_ the current token has permission, we can
- // use our cached manifest.
- atomic.AddUint64(&c.stats.APICalls, 1)
+ if pdh == "" {
+ // UUID->PDH mapping is not cached, might as well get
+ // the whole collection record and be done (below).
+ c.logger.Debugf("cache(%s): have no pdh", targetID)
+ } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
+ // PDH->manifest is not cached, might as well get the
+ // whole collection record (below).
+ c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
+ } else if !pdhRefresh {
+ // We looked up UUID->PDH very recently, and we still
+ // have the manifest for that PDH.
+ c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
+ return cached, nil
+ } else {
+ // Get current PDH for this UUID (and confirm we still
+ // have read permission). Most likely, the cached PDH
+ // is still correct, in which case we can use our
+ // cached manifest.
c.metrics.apiCalls.Inc()
var current arvados.Collection
err := arv.Get("collections", targetID, selectPDH, ¤t)
return nil, err
}
if current.PortableDataHash == pdh {
- c.permissions.Add(permKey, &cachedPermission{
- expire: time.Now().Add(time.Duration(c.TTL)),
- })
- if pdh != targetID {
- c.pdhs.Add(targetID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.UUIDTTL)),
- pdh: pdh,
- })
- }
- return collection, err
- } else {
- // PDH changed, but now we know we have
- // permission -- and maybe we already have the
- // new PDH in the cache.
- if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
- return coll, nil
- }
+ // PDH has not changed, cached manifest is
+ // correct.
+ c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
+ return cached, nil
+ }
+ if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
+ // PDH changed, and we already have the
+ // manifest for that new PDH.
+ c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
+ return cached, nil
}
}
- // Collection manifest is not cached.
- atomic.AddUint64(&c.stats.APICalls, 1)
+ // Either UUID->PDH is not cached, or PDH->manifest is not
+ // cached.
+ var retrieved arvados.Collection
c.metrics.apiCalls.Inc()
- err := arv.Get("collections", targetID, nil, &collection)
+ err := arv.Get("collections", targetID, nil, &retrieved)
if err != nil {
return nil, err
}
- exp := time.Now().Add(time.Duration(c.TTL))
- c.permissions.Add(permKey, &cachedPermission{
- expire: exp,
- })
- c.pdhs.Add(targetID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.UUIDTTL)),
- pdh: collection.PortableDataHash,
- })
- c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
+ c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
+ exp := time.Now().Add(time.Duration(c.config.TTL))
+ if targetID != retrieved.PortableDataHash {
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
+ pdh: retrieved.PortableDataHash,
+ })
+ }
+ c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
expire: exp,
- collection: collection,
+ collection: &retrieved,
})
- if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
- go c.pruneCollections()
+ if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
+ select {
+ case c.chPruneCollections <- struct{}{}:
+ default:
+ }
}
- return collection, nil
+ return &retrieved, nil
}
// pruneCollections checks the total bytes occupied by manifest_text
}
}
for i, k := range keys {
- if size <= c.MaxCollectionBytes {
+ if size <= c.config.MaxCollectionBytes/2 {
break
}
if expired[i] {
}
}
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
for _, k := range c.collections.Keys() {
}
size += uint64(len(v.(*cachedCollection).collection.ManifestText))
}
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+ size += uint64(fs.MemorySize())
+ }
+ }
return size
}
c.collections.Remove(key)
return nil
}
- atomic.AddUint64(&c.stats.CollectionHits, 1)
c.metrics.collectionHits.Inc()
return ent.collection
}
+
+func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
+ // Get and cache user record associated with this
+ // token. We need to know their UUID for logging, and
+ // whether they are an admin or not for certain
+ // permission checks.
+
+ // Get/create session entry
+ _, sess, err := c.GetSession(token)
+ if err != nil {
+ return nil, err
+ }
+
+ // See if the user is already set, and if so, return it
+ user, _ := sess.user.Load().(*arvados.User)
+ if user != nil {
+ return user, nil
+ }
+
+ // Fetch the user record
+ c.metrics.apiCalls.Inc()
+ var current arvados.User
+
+ err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ // Stash the user record for next time
+ sess.user.Store(¤t)
+ return ¤t, nil
+}