"sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "github.com/hashicorp/golang-lru"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/prometheus/client_golang/prometheus"
)
-type cache struct {
- TTL arvados.Duration
- MaxCollectionEntries int
- MaxCollectionBytes int64
- MaxPermissionEntries int
- MaxUUIDEntries int
+const metricsUpdateInterval = time.Second / 10
- stats cacheStats
+type cache struct {
+ cluster *arvados.Cluster
+ config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
+ registry *prometheus.Registry
+ metrics cacheMetrics
pdhs *lru.TwoQueueCache
collections *lru.TwoQueueCache
permissions *lru.TwoQueueCache
+ sessions *lru.TwoQueueCache
setupOnce sync.Once
}
-type cacheStats struct {
- Requests uint64 `json:"Cache.Requests"`
- CollectionBytes uint64 `json:"Cache.CollectionBytes"`
- CollectionEntries int `json:"Cache.CollectionEntries"`
- CollectionHits uint64 `json:"Cache.CollectionHits"`
- PDHHits uint64 `json:"Cache.UUIDHits"`
- PermissionHits uint64 `json:"Cache.PermissionHits"`
- APICalls uint64 `json:"Cache.APICalls"`
+type cacheMetrics struct {
+ requests prometheus.Counter
+ collectionBytes prometheus.Gauge
+ collectionEntries prometheus.Gauge
+ sessionEntries prometheus.Gauge
+ collectionHits prometheus.Counter
+ pdhHits prometheus.Counter
+ permissionHits prometheus.Counter
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
+ apiCalls prometheus.Counter
+}
+
+func (m *cacheMetrics) setup(reg *prometheus.Registry) {
+ m.requests = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_collectioncache",
+ Name: "requests",
+ Help: "Number of targetID-to-manifest lookups handled.",
+ })
+ reg.MustRegister(m.requests)
+ m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_collectioncache",
+ Name: "hits",
+ Help: "Number of pdh-to-manifest cache hits.",
+ })
+ reg.MustRegister(m.collectionHits)
+ m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_collectioncache",
+ Name: "pdh_hits",
+ Help: "Number of uuid-to-pdh cache hits.",
+ })
+ reg.MustRegister(m.pdhHits)
+ m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_collectioncache",
+ Name: "permission_hits",
+ Help: "Number of targetID-to-permission cache hits.",
+ })
+ reg.MustRegister(m.permissionHits)
+ m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_collectioncache",
+ Name: "api_calls",
+ Help: "Number of outgoing API calls made by cache.",
+ })
+ reg.MustRegister(m.apiCalls)
+ m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "cached_collection_bytes",
+ Help: "Total size of all cached manifests and sessions.",
+ })
+ reg.MustRegister(m.collectionBytes)
+ m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_collectioncache",
+ Name: "cached_manifests",
+ Help: "Number of manifests in cache.",
+ })
+ reg.MustRegister(m.collectionEntries)
+ m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "active",
+ Help: "Number of active token sessions.",
+ })
+ reg.MustRegister(m.sessionEntries)
+ m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "hits",
+ Help: "Number of token session cache hits.",
+ })
+ reg.MustRegister(m.sessionHits)
+ m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "misses",
+ Help: "Number of token session cache misses.",
+ })
+ reg.MustRegister(m.sessionMisses)
}
type cachedPDH struct {
expire time.Time
}
+type cachedSession struct {
+ expire time.Time
+ fs atomic.Value
+}
+
func (c *cache) setup() {
var err error
- c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
+ c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
+ if err != nil {
+ panic(err)
+ }
+ c.collections, err = lru.New2Q(c.config.MaxCollectionEntries)
if err != nil {
panic(err)
}
- c.collections, err = lru.New2Q(c.MaxCollectionEntries)
+ c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
if err != nil {
panic(err)
}
- c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
+ c.sessions, err = lru.New2Q(c.config.MaxSessions)
if err != nil {
panic(err)
}
+
+ reg := c.registry
+ if reg == nil {
+ reg = prometheus.NewRegistry()
+ }
+ c.metrics.setup(reg)
+ go func() {
+ for range time.Tick(metricsUpdateInterval) {
+ c.updateGauges()
+ }
+ }()
+}
+
+func (c *cache) updateGauges() {
+ c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
+ c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+ c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
var selectPDH = map[string]interface{}{
"select": []string{"portable_data_hash"},
}
-func (c *cache) Stats() cacheStats {
+// Update saves a modified version (fs) to an existing collection
+// (coll) and, if successful, updates the relevant cache entries so
+// subsequent calls to Get() reflect the modifications.
+func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
c.setupOnce.Do(c.setup)
- return cacheStats{
- Requests: atomic.LoadUint64(&c.stats.Requests),
- CollectionBytes: c.collectionBytes(),
- CollectionEntries: c.collections.Len(),
- CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
- PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
- PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
- APICalls: atomic.LoadUint64(&c.stats.APICalls),
+
+ m, err := fs.MarshalManifest(".")
+ if err != nil || m == coll.ManifestText {
+ return err
+ }
+ coll.ManifestText = m
+ var updated arvados.Collection
+ defer c.pdhs.Remove(coll.UUID)
+ err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": coll.ManifestText,
+ },
+ })
+ if err == nil {
+ c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ collection: &updated,
+ })
}
+ return err
}
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
c.setupOnce.Do(c.setup)
+ c.sessions.Remove(token)
+}
- atomic.AddUint64(&c.stats.Requests, 1)
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+ c.setupOnce.Do(c.setup)
+ now := time.Now()
+ ent, _ := c.sessions.Get(token)
+ sess, _ := ent.(*cachedSession)
+ expired := false
+ if sess == nil {
+ c.metrics.sessionMisses.Inc()
+ sess = &cachedSession{
+ expire: now.Add(c.config.TTL.Duration()),
+ }
+ c.sessions.Add(token, sess)
+ } else if sess.expire.Before(now) {
+ c.metrics.sessionMisses.Inc()
+ expired = true
+ } else {
+ c.metrics.sessionHits.Inc()
+ }
+ go c.pruneSessions()
+ fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+ if fs != nil && !expired {
+ return fs, nil
+ }
+ ac, err := arvados.NewClientFromConfig(c.cluster)
+ if err != nil {
+ return nil, err
+ }
+ ac.AuthToken = token
+ arv, err := arvadosclient.New(ac)
+ if err != nil {
+ return nil, err
+ }
+ kc := keepclient.New(arv)
+ fs = ac.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
+ return fs, nil
+}
+
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
+func (c *cache) pruneSessions() {
+ now := time.Now()
+ var size int64
+ keys := c.sessions.Keys()
+ for _, token := range keys {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
+ if s.expire.Before(now) {
+ c.sessions.Remove(token)
+ continue
+ }
+ if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+ size += fs.MemorySize()
+ }
+ }
+ // Remove tokens until reaching size limit, starting with the
+ // least frequently used entries (which Keys() returns last).
+ for i := len(keys) - 1; i >= 0; i-- {
+ token := keys[i]
+ if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ break
+ }
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
+ fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+ if fs == nil {
+ continue
+ }
+ c.sessions.Remove(token)
+ size -= fs.MemorySize()
+ }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
+ c.setupOnce.Do(c.setup)
+ c.metrics.requests.Inc()
permOK := false
permKey := arv.ApiToken + "\000" + targetID
c.permissions.Remove(permKey)
} else {
permOK = true
- atomic.AddUint64(&c.stats.PermissionHits, 1)
+ c.metrics.permissionHits.Inc()
}
}
c.pdhs.Remove(targetID)
} else {
pdh = ent.pdh
- atomic.AddUint64(&c.stats.PDHHits, 1)
+ c.metrics.pdhHits.Inc()
}
}
var collection *arvados.Collection
if pdh != "" {
- collection = c.lookupCollection(pdh)
+ collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
}
if collection != nil && permOK {
// likely, the cached PDH is still correct; if so,
// _and_ the current token has permission, we can
// use our cached manifest.
- atomic.AddUint64(&c.stats.APICalls, 1)
+ c.metrics.apiCalls.Inc()
var current arvados.Collection
err := arv.Get("collections", targetID, selectPDH, ¤t)
if err != nil {
return nil, err
}
if current.PortableDataHash == pdh {
- exp := time.Now().Add(time.Duration(c.TTL))
c.permissions.Add(permKey, &cachedPermission{
- expire: exp,
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
})
if pdh != targetID {
c.pdhs.Add(targetID, &cachedPDH{
- expire: exp,
+ expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
pdh: pdh,
})
}
return collection, err
- } else {
- // PDH changed, but now we know we have
- // permission -- and maybe we already have the
- // new PDH in the cache.
- if coll := c.lookupCollection(current.PortableDataHash); coll != nil {
- return coll, nil
- }
+ }
+ // PDH changed, but now we know we have
+ // permission -- and maybe we already have the
+ // new PDH in the cache.
+ if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
+ return coll, nil
}
}
// Collection manifest is not cached.
- atomic.AddUint64(&c.stats.APICalls, 1)
+ c.metrics.apiCalls.Inc()
err := arv.Get("collections", targetID, nil, &collection)
if err != nil {
return nil, err
}
- exp := time.Now().Add(time.Duration(c.TTL))
+ exp := time.Now().Add(time.Duration(c.config.TTL))
c.permissions.Add(permKey, &cachedPermission{
expire: exp,
})
c.pdhs.Add(targetID, &cachedPDH{
- expire: exp,
+ expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
pdh: collection.PortableDataHash,
})
- c.collections.Add(collection.PortableDataHash, &cachedCollection{
+ c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
expire: exp,
collection: collection,
})
- if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
+ if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
go c.pruneCollections()
}
return collection, nil
}
}
for i, k := range keys {
- if size <= c.MaxCollectionBytes {
+ if size <= c.config.MaxCollectionBytes/2 {
break
}
if expired[i] {
}
}
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
for _, k := range c.collections.Keys() {
}
size += uint64(len(v.(*cachedCollection).collection.ManifestText))
}
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+ size += uint64(fs.MemorySize())
+ }
+ }
return size
}
-func (c *cache) lookupCollection(pdh string) *arvados.Collection {
- if pdh == "" {
+func (c *cache) lookupCollection(key string) *arvados.Collection {
+ e, cached := c.collections.Get(key)
+ if !cached {
return nil
- } else if ent, cached := c.collections.Get(pdh); !cached {
+ }
+ ent := e.(*cachedCollection)
+ if ent.expire.Before(time.Now()) {
+ c.collections.Remove(key)
return nil
- } else {
- ent := ent.(*cachedCollection)
- if ent.expire.Before(time.Now()) {
- c.collections.Remove(pdh)
- return nil
- } else {
- atomic.AddUint64(&c.stats.CollectionHits, 1)
- return ent.collection
- }
}
+ c.metrics.collectionHits.Inc()
+ return ent.collection
}