--- /dev/null
+package main
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "github.com/hashicorp/golang-lru"
+)
+
+type cache struct {
+ TTL arvados.Duration
+ CollectionEntries int
+ CollectionBytes int64
+ PermissionEntries int
+ UUIDEntries int
+
+ stats cacheStats
+ pdhs *lru.TwoQueueCache
+ collections *lru.TwoQueueCache
+ permissions *lru.TwoQueueCache
+ setupOnce sync.Once
+}
+
+type cacheStats struct {
+ Requests uint64
+ CollectionHits uint64
+ PDHHits uint64
+ PermissionHits uint64
+ APICalls uint64
+}
+
+type cachedPDH struct {
+ expire time.Time
+ pdh string
+}
+
+type cachedCollection struct {
+ expire time.Time
+ collection map[string]interface{}
+}
+
+type cachedPermission struct {
+ expire time.Time
+}
+
+func (c *cache) setup() {
+ var err error
+ c.pdhs, err = lru.New2Q(c.UUIDEntries)
+ if err != nil {
+ panic(err)
+ }
+ c.collections, err = lru.New2Q(c.CollectionEntries)
+ if err != nil {
+ panic(err)
+ }
+ c.permissions, err = lru.New2Q(c.PermissionEntries)
+ if err != nil {
+ panic(err)
+ }
+}
+
+var selectPDH = map[string]interface{}{
+ "select": []string{"portable_data_hash"},
+}
+
+func (c *cache) Stats() cacheStats {
+ return cacheStats{
+ Requests: atomic.LoadUint64(&c.stats.Requests),
+ CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
+ PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
+ PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
+ APICalls: atomic.LoadUint64(&c.stats.APICalls),
+ }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
+ c.setupOnce.Do(c.setup)
+
+ atomic.AddUint64(&c.stats.Requests, 1)
+
+ permOK := false
+ permKey := arv.ApiToken + "\000" + targetID
+ if ent, cached := c.permissions.Get(permKey); cached {
+ ent := ent.(*cachedPermission)
+ if ent.expire.Before(time.Now()) {
+ c.permissions.Remove(permKey)
+ } else {
+ permOK = true
+ atomic.AddUint64(&c.stats.PermissionHits, 1)
+ }
+ }
+
+ var pdh string
+ if arvadosclient.PDHMatch(targetID) {
+ pdh = targetID
+ } else if ent, cached := c.pdhs.Get(targetID); cached {
+ ent := ent.(*cachedPDH)
+ if ent.expire.Before(time.Now()) {
+ c.pdhs.Remove(targetID)
+ } else {
+ pdh = ent.pdh
+ atomic.AddUint64(&c.stats.PDHHits, 1)
+ }
+ }
+
+ collection := c.lookupCollection(pdh)
+
+ if collection != nil && permOK && !forceReload {
+ return collection, nil
+ }
+
+ if collection != nil {
+ // Ask API for current PDH for this targetID. Most
+ // likely, the cached PDH is still correct; if so,
+ // _and_ the current token has permission, we can
+ // use our cached manifest.
+ atomic.AddUint64(&c.stats.APICalls, 1)
+ var current map[string]interface{}
+ err := arv.Get("collections", targetID, selectPDH, ¤t)
+ if err != nil {
+ return nil, err
+ }
+ if checkPDH, ok := current["portable_data_hash"].(string); !ok {
+ return nil, fmt.Errorf("API response for %q had no PDH", targetID)
+ } else if checkPDH == pdh {
+ exp := time.Now().Add(time.Duration(c.TTL))
+ c.permissions.Add(permKey, &cachedPermission{
+ expire: exp,
+ })
+ if pdh != targetID {
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ pdh: pdh,
+ })
+ }
+ return collection, err
+ } else {
+ // PDH changed, but now we know we have
+ // permission -- and maybe we already have the
+ // new PDH in the cache.
+ if coll := c.lookupCollection(checkPDH); coll != nil {
+ return coll, nil
+ }
+ }
+ }
+
+ // Collection manifest is not cached.
+ atomic.AddUint64(&c.stats.APICalls, 1)
+ err := arv.Get("collections", targetID, nil, &collection)
+ if err != nil {
+ return nil, err
+ }
+ pdh, ok := collection["portable_data_hash"].(string)
+ if !ok {
+ return nil, fmt.Errorf("API response for %q had no PDH", targetID)
+ }
+ exp := time.Now().Add(time.Duration(c.TTL))
+ c.permissions.Add(permKey, &cachedPermission{
+ expire: exp,
+ })
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ pdh: pdh,
+ })
+ c.collections.Add(pdh, &cachedCollection{
+ expire: exp,
+ collection: collection,
+ })
+ if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) {
+ c.pruneCollections()
+ }
+ return collection, nil
+}
+
+// pruneCollections checks the total bytes occupied by manifest_text
+// in the collection cache and removes old entries as needed to bring
+// the total size down to CollectionBytes. It also deletes all expired
+// entries.
+//
+// pruneCollections does not aim to be perfectly correct when there is
+// concurrent cache activity.
+func (c *cache) pruneCollections() {
+ var size int64
+ now := time.Now()
+ keys := c.collections.Keys()
+ entsize := make([]int, len(keys))
+ expired := make([]bool, len(keys))
+ for i, k := range keys {
+ v, ok := c.collections.Peek(k)
+ if !ok {
+ continue
+ }
+ ent := v.(*cachedCollection)
+ n := len(ent.collection["manifest_text"].(string))
+ size += int64(n)
+ entsize[i] = n
+ expired[i] = ent.expire.Before(now)
+ }
+ for i, k := range keys {
+ if expired[i] {
+ c.collections.Remove(k)
+ size -= int64(entsize[i])
+ }
+ }
+ for i, k := range keys {
+ if size <= c.CollectionBytes {
+ break
+ }
+ if expired[i] {
+ // already removed this entry in the previous loop
+ continue
+ }
+ c.collections.Remove(k)
+ size -= int64(entsize[i])
+ }
+}
+
+func (c *cache) lookupCollection(pdh string) map[string]interface{} {
+ if pdh == "" {
+ return nil
+ } else if ent, cached := c.collections.Get(pdh); !cached {
+ return nil
+ } else {
+ ent := ent.(*cachedCollection)
+ if ent.expire.Before(time.Now()) {
+ c.collections.Remove(pdh)
+ return nil
+ } else {
+ atomic.AddUint64(&c.stats.CollectionHits, 1)
+ return ent.collection
+ }
+ }
+}