"git.arvados.org/arvados.git/sdk/go/keepclient"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
const metricsUpdateInterval = time.Second / 10
type cache struct {
cluster *arvados.Cluster
config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
+ logger logrus.FieldLogger
registry *prometheus.Registry
metrics cacheMetrics
pdhs *lru.TwoQueueCache
collections *lru.TwoQueueCache
- permissions *lru.TwoQueueCache
sessions *lru.TwoQueueCache
setupOnce sync.Once
+
+ chPruneSessions chan struct{}
+ chPruneCollections chan struct{}
}
type cacheMetrics struct {
sessionEntries prometheus.Gauge
collectionHits prometheus.Counter
pdhHits prometheus.Counter
- permissionHits prometheus.Counter
sessionHits prometheus.Counter
sessionMisses prometheus.Counter
apiCalls prometheus.Counter
Help: "Number of uuid-to-pdh cache hits.",
})
reg.MustRegister(m.pdhHits)
- m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "permission_hits",
- Help: "Number of targetID-to-permission cache hits.",
- })
- reg.MustRegister(m.permissionHits)
m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "keepweb_collectioncache",
}
type cachedPDH struct {
- expire time.Time
- pdh string
+ expire time.Time
+ refresh time.Time
+ pdh string
}
type cachedCollection struct {
if err != nil {
panic(err)
}
- c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
- if err != nil {
- panic(err)
- }
c.sessions, err = lru.New2Q(c.config.MaxSessions)
if err != nil {
panic(err)
c.updateGauges()
}
}()
+ c.chPruneCollections = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneCollections {
+ c.pruneCollections()
+ }
+ }()
+ c.chPruneSessions = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneSessions {
+ c.pruneSessions()
+ }
+ }()
}
func (c *cache) updateGauges() {
}
coll.ManifestText = m
var updated arvados.Collection
- defer c.pdhs.Remove(coll.UUID)
err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
"collection": map[string]string{
"manifest_text": coll.ManifestText,
},
})
- if err == nil {
- c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
- expire: time.Now().Add(time.Duration(c.config.TTL)),
- collection: &updated,
- })
+ if err != nil {
+ c.pdhs.Remove(coll.UUID)
+ return err
}
- return err
+ c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ collection: &updated,
+ })
+ c.pdhs.Add(coll.UUID, &cachedPDH{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
+ pdh: updated.PortableDataHash,
+ })
+ return nil
}
// ResetSession unloads any potentially stale state. Should be called
} else {
c.metrics.sessionHits.Inc()
}
- go c.pruneSessions()
+ select {
+ case c.chPruneSessions <- struct{}{}:
+ default:
+ }
fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
if fs != nil && !expired {
return fs, sess, nil
c.setupOnce.Do(c.setup)
c.metrics.requests.Inc()
- permOK := false
- permKey := arv.ApiToken + "\000" + targetID
- if forceReload {
- } else if ent, cached := c.permissions.Get(permKey); cached {
- ent := ent.(*cachedPermission)
- if ent.expire.Before(time.Now()) {
- c.permissions.Remove(permKey)
- } else {
- permOK = true
- c.metrics.permissionHits.Inc()
- }
- }
-
+ var pdhRefresh bool
var pdh string
if arvadosclient.PDHMatch(targetID) {
pdh = targetID
c.pdhs.Remove(targetID)
} else {
pdh = ent.pdh
+ pdhRefresh = forceReload || time.Now().After(ent.refresh)
c.metrics.pdhHits.Inc()
}
}
- var collection *arvados.Collection
- if pdh != "" {
- collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
- }
-
- if collection != nil && permOK {
- return collection, nil
- } else if collection != nil {
- // Ask API for current PDH for this targetID. Most
- // likely, the cached PDH is still correct; if so,
- // _and_ the current token has permission, we can
- // use our cached manifest.
+ if pdh == "" {
+ // UUID->PDH mapping is not cached, might as well get
+ // the whole collection record and be done (below).
+ c.logger.Debugf("cache(%s): have no pdh", targetID)
+ } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
+ // PDH->manifest is not cached, might as well get the
+ // whole collection record (below).
+ c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
+ } else if !pdhRefresh {
+ // We looked up UUID->PDH very recently, and we still
+ // have the manifest for that PDH.
+ c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
+ return cached, nil
+ } else {
+ // Get current PDH for this UUID (and confirm we still
+ // have read permission). Most likely, the cached PDH
+ // is still correct, in which case we can use our
+ // cached manifest.
c.metrics.apiCalls.Inc()
var current arvados.Collection
err := arv.Get("collections", targetID, selectPDH, ¤t)
return nil, err
}
if current.PortableDataHash == pdh {
- c.permissions.Add(permKey, &cachedPermission{
- expire: time.Now().Add(time.Duration(c.config.TTL)),
- })
- if pdh != targetID {
- c.pdhs.Add(targetID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
- pdh: pdh,
- })
- }
- return collection, err
+ // PDH has not changed, cached manifest is
+ // correct.
+ c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
+ return cached, nil
}
- // PDH changed, but now we know we have
- // permission -- and maybe we already have the
- // new PDH in the cache.
- if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
- return coll, nil
+ if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
+ // PDH changed, and we already have the
+ // manifest for that new PDH.
+ c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
+ return cached, nil
}
}
- // Collection manifest is not cached.
+ // Either UUID->PDH is not cached, or PDH->manifest is not
+ // cached.
+ var retrieved arvados.Collection
c.metrics.apiCalls.Inc()
- err := arv.Get("collections", targetID, nil, &collection)
+ err := arv.Get("collections", targetID, nil, &retrieved)
if err != nil {
return nil, err
}
+ c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
exp := time.Now().Add(time.Duration(c.config.TTL))
- c.permissions.Add(permKey, &cachedPermission{
- expire: exp,
- })
- c.pdhs.Add(targetID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
- pdh: collection.PortableDataHash,
- })
- c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
+ if targetID != retrieved.PortableDataHash {
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
+ pdh: retrieved.PortableDataHash,
+ })
+ }
+ c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
expire: exp,
- collection: collection,
+ collection: &retrieved,
})
- if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
- go c.pruneCollections()
+ if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
+ select {
+ case c.chPruneCollections <- struct{}{}:
+ default:
+ }
}
- return collection, nil
+ return &retrieved, nil
}
// pruneCollections checks the total bytes occupied by manifest_text