X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1416b0952adc0bfee85e15d9c86a51c32fcfd003..8041cc091d787764947c56eb5120da11bc2c139b:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index 8336b78f9e..07db7a016f 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -6,29 +6,27 @@ package main import ( "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "github.com/hashicorp/golang-lru" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/keepclient" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" ) const metricsUpdateInterval = time.Second / 10 type cache struct { - TTL arvados.Duration - UUIDTTL arvados.Duration - MaxCollectionEntries int - MaxCollectionBytes int64 - MaxPermissionEntries int - MaxUUIDEntries int - + cluster *arvados.Cluster + config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead registry *prometheus.Registry metrics cacheMetrics pdhs *lru.TwoQueueCache collections *lru.TwoQueueCache permissions *lru.TwoQueueCache + sessions *lru.TwoQueueCache setupOnce sync.Once } @@ -36,9 +34,12 @@ type cacheMetrics struct { requests prometheus.Counter collectionBytes prometheus.Gauge collectionEntries prometheus.Gauge + sessionEntries prometheus.Gauge collectionHits prometheus.Counter pdhHits prometheus.Counter permissionHits prometheus.Counter + sessionHits prometheus.Counter + sessionMisses prometheus.Counter apiCalls prometheus.Counter } @@ -80,9 +81,9 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { reg.MustRegister(m.apiCalls) m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifest_bytes", - Help: "Total size of all manifests in cache.", + Subsystem: "keepweb_sessions", + Name: "cached_collection_bytes", + Help: "Total size of all cached manifests and sessions.", }) reg.MustRegister(m.collectionBytes) m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -92,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { Help: "Number of manifests in cache.", }) reg.MustRegister(m.collectionEntries) + m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "active", + Help: "Number of active token sessions.", + }) + reg.MustRegister(m.sessionEntries) + m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "hits", + Help: "Number of token session cache hits.", + }) + reg.MustRegister(m.sessionHits) + m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_sessions", + Name: "misses", + Help: "Number of token session cache misses.", + }) + reg.MustRegister(m.sessionMisses) } type cachedPDH struct { @@ -108,17 +130,26 @@ type cachedPermission struct { expire time.Time } +type cachedSession struct { + expire time.Time + fs atomic.Value +} + func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.MaxUUIDEntries) + c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries) if err != nil { panic(err) } - c.collections, err = lru.New2Q(c.MaxCollectionEntries) + c.collections, err = lru.New2Q(c.config.MaxCollectionEntries) if err != nil { panic(err) } - c.permissions, err = lru.New2Q(c.MaxPermissionEntries) + c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries) + if err != nil { + panic(err) + } + c.sessions, err = lru.New2Q(c.config.MaxSessions) if err != nil { panic(err) } @@ -138,6 +169,7 @@ func (c *cache) setup() { func (c *cache) updateGauges() { c.metrics.collectionBytes.Set(float64(c.collectionBytes())) c.metrics.collectionEntries.Set(float64(c.collections.Len())) + c.metrics.sessionEntries.Set(float64(c.sessions.Len())) } var selectPDH = map[string]interface{}{ @@ -150,23 +182,117 @@ var selectPDH = map[string]interface{}{ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { c.setupOnce.Do(c.setup) - if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText { + m, err := fs.MarshalManifest(".") + if err != nil || m == coll.ManifestText { return err - } else { - coll.ManifestText = m } + coll.ManifestText = m var updated arvados.Collection defer c.pdhs.Remove(coll.UUID) - err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil) + err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ + "collection": map[string]string{ + "manifest_text": coll.ManifestText, + }, + }) if err == nil { c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{ - expire: time.Now().Add(time.Duration(c.TTL)), + expire: time.Now().Add(time.Duration(c.config.TTL)), collection: &updated, }) } return err } +// ResetSession unloads any potentially stale state. Should be called +// after write operations, so subsequent reads don't return stale +// data. +func (c *cache) ResetSession(token string) { + c.setupOnce.Do(c.setup) + c.sessions.Remove(token) +} + +// Get a long-lived CustomFileSystem suitable for doing a read operation +// with the given token. +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) { + c.setupOnce.Do(c.setup) + now := time.Now() + ent, _ := c.sessions.Get(token) + sess, _ := ent.(*cachedSession) + expired := false + if sess == nil { + c.metrics.sessionMisses.Inc() + sess = &cachedSession{ + expire: now.Add(c.config.TTL.Duration()), + } + c.sessions.Add(token, sess) + } else if sess.expire.Before(now) { + c.metrics.sessionMisses.Inc() + expired = true + } else { + c.metrics.sessionHits.Inc() + } + go c.pruneSessions() + fs, _ := sess.fs.Load().(arvados.CustomFileSystem) + if fs != nil && !expired { + return fs, nil + } + ac, err := arvados.NewClientFromConfig(c.cluster) + if err != nil { + return nil, err + } + ac.AuthToken = token + arv, err := arvadosclient.New(ac) + if err != nil { + return nil, err + } + kc := keepclient.New(arv) + fs = ac.SiteFileSystem(kc) + fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.fs.Store(fs) + return fs, nil +} + +// Remove all expired session cache entries, then remove more entries +// until approximate remaining size <= maxsize/2 +func (c *cache) pruneSessions() { + now := time.Now() + var size int64 + keys := c.sessions.Keys() + for _, token := range keys { + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + s := ent.(*cachedSession) + if s.expire.Before(now) { + c.sessions.Remove(token) + continue + } + if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok { + size += fs.MemorySize() + } + } + // Remove tokens until reaching size limit, starting with the + // least frequently used entries (which Keys() returns last). + for i := len(keys) - 1; i >= 0; i-- { + token := keys[i] + if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { + break + } + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + s := ent.(*cachedSession) + fs, _ := s.fs.Load().(arvados.CustomFileSystem) + if fs == nil { + continue + } + c.sessions.Remove(token) + size -= fs.MemorySize() + } +} + func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { c.setupOnce.Do(c.setup) c.metrics.requests.Inc() @@ -217,22 +343,21 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo } if current.PortableDataHash == pdh { c.permissions.Add(permKey, &cachedPermission{ - expire: time.Now().Add(time.Duration(c.TTL)), + expire: time.Now().Add(time.Duration(c.config.TTL)), }) if pdh != targetID { c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.UUIDTTL)), + expire: time.Now().Add(time.Duration(c.config.UUIDTTL)), pdh: pdh, }) } return collection, err - } else { - // PDH changed, but now we know we have - // permission -- and maybe we already have the - // new PDH in the cache. - if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil { - return coll, nil - } + } + // PDH changed, but now we know we have + // permission -- and maybe we already have the + // new PDH in the cache. + if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil { + return coll, nil } } @@ -242,19 +367,19 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo if err != nil { return nil, err } - exp := time.Now().Add(time.Duration(c.TTL)) + exp := time.Now().Add(time.Duration(c.config.TTL)) c.permissions.Add(permKey, &cachedPermission{ expire: exp, }) c.pdhs.Add(targetID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.UUIDTTL)), + expire: time.Now().Add(time.Duration(c.config.UUIDTTL)), pdh: collection.PortableDataHash, }) c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{ expire: exp, collection: collection, }) - if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) { + if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) { go c.pruneCollections() } return collection, nil @@ -291,7 +416,7 @@ func (c *cache) pruneCollections() { } } for i, k := range keys { - if size <= c.MaxCollectionBytes { + if size <= c.config.MaxCollectionBytes/2 { break } if expired[i] { @@ -303,8 +428,8 @@ func (c *cache) pruneCollections() { } } -// collectionBytes returns the approximate memory size of the -// collection cache. +// collectionBytes returns the approximate combined memory size of the +// collection cache and session filesystem cache. func (c *cache) collectionBytes() uint64 { var size uint64 for _, k := range c.collections.Keys() { @@ -314,6 +439,15 @@ func (c *cache) collectionBytes() uint64 { } size += uint64(len(v.(*cachedCollection).collection.ManifestText)) } + for _, token := range c.sessions.Keys() { + ent, ok := c.sessions.Peek(token) + if !ok { + continue + } + if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok { + size += uint64(fs.MemorySize()) + } + } return size }