X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a15a137c593d24649e2960471d7273acad695186..7d91fe636e1ce09697fdff28b43e4020df041f17:/services/keep-web/cache.go diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index 26f6627424..2ff2136ed7 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -1,38 +1,91 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( - "fmt" "sync" - "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" "github.com/hashicorp/golang-lru" + "github.com/prometheus/client_golang/prometheus" ) -type cache struct { - TTL arvados.Duration - CollectionEntries int - CollectionBytes int64 - PermissionEntries int - UUIDEntries int +const metricsUpdateInterval = time.Second / 10 - stats cacheStats +type cache struct { + config *arvados.WebDAVCacheConfig + registry *prometheus.Registry + metrics cacheMetrics pdhs *lru.TwoQueueCache collections *lru.TwoQueueCache permissions *lru.TwoQueueCache setupOnce sync.Once } -type cacheStats struct { - Requests uint64 `json:"Cache.Requests"` - CollectionBytes uint64 `json:"Cache.CollectionBytes"` - CollectionEntries int `json:"Cache.CollectionEntries"` - CollectionHits uint64 `json:"Cache.CollectionHits"` - PDHHits uint64 `json:"Cache.UUIDHits"` - PermissionHits uint64 `json:"Cache.PermissionHits"` - APICalls uint64 `json:"Cache.APICalls"` +type cacheMetrics struct { + requests prometheus.Counter + collectionBytes prometheus.Gauge + collectionEntries prometheus.Gauge + collectionHits prometheus.Counter + pdhHits prometheus.Counter + permissionHits prometheus.Counter + apiCalls prometheus.Counter +} + +func (m *cacheMetrics) setup(reg *prometheus.Registry) { + m.requests = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "requests", + Help: "Number of targetID-to-manifest lookups handled.", + }) + reg.MustRegister(m.requests) + m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "hits", + Help: "Number of pdh-to-manifest cache hits.", + }) + reg.MustRegister(m.collectionHits) + m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "pdh_hits", + Help: "Number of uuid-to-pdh cache hits.", + }) + reg.MustRegister(m.pdhHits) + m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "permission_hits", + Help: "Number of targetID-to-permission cache hits.", + }) + reg.MustRegister(m.permissionHits) + m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "api_calls", + Help: "Number of outgoing API calls made by cache.", + }) + reg.MustRegister(m.apiCalls) + m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "cached_manifest_bytes", + Help: "Total size of all manifests in cache.", + }) + reg.MustRegister(m.collectionBytes) + m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepweb_collectioncache", + Name: "cached_manifests", + Help: "Number of manifests in cache.", + }) + reg.MustRegister(m.collectionEntries) } type cachedPDH struct { @@ -42,7 +95,7 @@ type cachedPDH struct { type cachedCollection struct { expire time.Time - collection map[string]interface{} + collection *arvados.Collection } type cachedPermission struct { @@ -51,51 +104,81 @@ type cachedPermission struct { func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.UUIDEntries) + c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries) if err != nil { panic(err) } - c.collections, err = lru.New2Q(c.CollectionEntries) + c.collections, err = lru.New2Q(c.config.MaxCollectionEntries) if err != nil { panic(err) } - c.permissions, err = lru.New2Q(c.PermissionEntries) + c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries) if err != nil { panic(err) } + + reg := c.registry + if reg == nil { + reg = prometheus.NewRegistry() + } + c.metrics.setup(reg) + go func() { + for range time.Tick(metricsUpdateInterval) { + c.updateGauges() + } + }() +} + +func (c *cache) updateGauges() { + c.metrics.collectionBytes.Set(float64(c.collectionBytes())) + c.metrics.collectionEntries.Set(float64(c.collections.Len())) } var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } -func (c *cache) Stats() cacheStats { +// Update saves a modified version (fs) to an existing collection +// (coll) and, if successful, updates the relevant cache entries so +// subsequent calls to Get() reflect the modifications. +func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { c.setupOnce.Do(c.setup) - return cacheStats{ - Requests: atomic.LoadUint64(&c.stats.Requests), - CollectionBytes: c.collectionBytes(), - CollectionEntries: c.collections.Len(), - CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits), - PDHHits: atomic.LoadUint64(&c.stats.PDHHits), - PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits), - APICalls: atomic.LoadUint64(&c.stats.APICalls), + + if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText { + return err + } else { + coll.ManifestText = m + } + var updated arvados.Collection + defer c.pdhs.Remove(coll.UUID) + err := client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ + "collection": map[string]string{ + "manifest_text": coll.ManifestText, + }, + }) + if err == nil { + c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{ + expire: time.Now().Add(time.Duration(c.config.TTL)), + collection: &updated, + }) } + return err } -func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) { +func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { c.setupOnce.Do(c.setup) - - atomic.AddUint64(&c.stats.Requests, 1) + c.metrics.requests.Inc() permOK := false permKey := arv.ApiToken + "\000" + targetID - if ent, cached := c.permissions.Get(permKey); cached { + if forceReload { + } else if ent, cached := c.permissions.Get(permKey); cached { ent := ent.(*cachedPermission) if ent.expire.Before(time.Now()) { c.permissions.Remove(permKey) } else { permOK = true - atomic.AddUint64(&c.stats.PermissionHits, 1) + c.metrics.permissionHits.Inc() } } @@ -108,37 +191,35 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo c.pdhs.Remove(targetID) } else { pdh = ent.pdh - atomic.AddUint64(&c.stats.PDHHits, 1) + c.metrics.pdhHits.Inc() } } - collection := c.lookupCollection(pdh) - - if collection != nil && permOK && !forceReload { - return collection, nil + var collection *arvados.Collection + if pdh != "" { + collection = c.lookupCollection(arv.ApiToken + "\000" + pdh) } - if collection != nil { + if collection != nil && permOK { + return collection, nil + } else if collection != nil { // Ask API for current PDH for this targetID. Most // likely, the cached PDH is still correct; if so, // _and_ the current token has permission, we can // use our cached manifest. - atomic.AddUint64(&c.stats.APICalls, 1) - var current map[string]interface{} + c.metrics.apiCalls.Inc() + var current arvados.Collection err := arv.Get("collections", targetID, selectPDH, ¤t) if err != nil { return nil, err } - if checkPDH, ok := current["portable_data_hash"].(string); !ok { - return nil, fmt.Errorf("API response for %q had no PDH", targetID) - } else if checkPDH == pdh { - exp := time.Now().Add(time.Duration(c.TTL)) + if current.PortableDataHash == pdh { c.permissions.Add(permKey, &cachedPermission{ - expire: exp, + expire: time.Now().Add(time.Duration(c.config.TTL)), }) if pdh != targetID { c.pdhs.Add(targetID, &cachedPDH{ - expire: exp, + expire: time.Now().Add(time.Duration(c.config.UUIDTTL)), pdh: pdh, }) } @@ -147,36 +228,32 @@ func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceRelo // PDH changed, but now we know we have // permission -- and maybe we already have the // new PDH in the cache. - if coll := c.lookupCollection(checkPDH); coll != nil { + if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil { return coll, nil } } } // Collection manifest is not cached. - atomic.AddUint64(&c.stats.APICalls, 1) + c.metrics.apiCalls.Inc() err := arv.Get("collections", targetID, nil, &collection) if err != nil { return nil, err } - pdh, ok := collection["portable_data_hash"].(string) - if !ok { - return nil, fmt.Errorf("API response for %q had no PDH", targetID) - } - exp := time.Now().Add(time.Duration(c.TTL)) + exp := time.Now().Add(time.Duration(c.config.TTL)) c.permissions.Add(permKey, &cachedPermission{ expire: exp, }) c.pdhs.Add(targetID, &cachedPDH{ - expire: exp, - pdh: pdh, + expire: time.Now().Add(time.Duration(c.config.UUIDTTL)), + pdh: collection.PortableDataHash, }) - c.collections.Add(pdh, &cachedCollection{ + c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{ expire: exp, collection: collection, }) - if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) { - c.pruneCollections() + if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) { + go c.pruneCollections() } return collection, nil } @@ -200,7 +277,7 @@ func (c *cache) pruneCollections() { continue } ent := v.(*cachedCollection) - n := len(ent.collection["manifest_text"].(string)) + n := len(ent.collection.ManifestText) size += int64(n) entsize[i] = n expired[i] = ent.expire.Before(now) @@ -212,7 +289,7 @@ func (c *cache) pruneCollections() { } } for i, k := range keys { - if size <= c.CollectionBytes { + if size <= c.config.MaxCollectionBytes { break } if expired[i] { @@ -233,24 +310,21 @@ func (c *cache) collectionBytes() uint64 { if !ok { continue } - size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string))) + size += uint64(len(v.(*cachedCollection).collection.ManifestText)) } return size } -func (c *cache) lookupCollection(pdh string) map[string]interface{} { - if pdh == "" { +func (c *cache) lookupCollection(key string) *arvados.Collection { + e, cached := c.collections.Get(key) + if !cached { return nil - } else if ent, cached := c.collections.Get(pdh); !cached { + } + ent := e.(*cachedCollection) + if ent.expire.Before(time.Now()) { + c.collections.Remove(key) return nil - } else { - ent := ent.(*cachedCollection) - if ent.expire.Before(time.Now()) { - c.collections.Remove(pdh) - return nil - } else { - atomic.AddUint64(&c.stats.CollectionHits, 1) - return ent.collection - } } + c.metrics.collectionHits.Inc() + return ent.collection }