8 "git.curoverse.com/arvados.git/sdk/go/arvados"
9 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10 "github.com/hashicorp/golang-lru"
15 MaxCollectionEntries int
16 MaxCollectionBytes int64
17 MaxPermissionEntries int
21 pdhs *lru.TwoQueueCache
22 collections *lru.TwoQueueCache
23 permissions *lru.TwoQueueCache
27 type cacheStats struct {
28 Requests uint64 `json:"Cache.Requests"`
29 CollectionBytes uint64 `json:"Cache.CollectionBytes"`
30 CollectionEntries int `json:"Cache.CollectionEntries"`
31 CollectionHits uint64 `json:"Cache.CollectionHits"`
32 PDHHits uint64 `json:"Cache.UUIDHits"`
33 PermissionHits uint64 `json:"Cache.PermissionHits"`
34 APICalls uint64 `json:"Cache.APICalls"`
37 type cachedPDH struct {
42 type cachedCollection struct {
44 collection *arvados.Collection
47 type cachedPermission struct {
51 func (c *cache) setup() {
53 c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
57 c.collections, err = lru.New2Q(c.MaxCollectionEntries)
61 c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
67 var selectPDH = map[string]interface{}{
68 "select": []string{"portable_data_hash"},
71 func (c *cache) Stats() cacheStats {
72 c.setupOnce.Do(c.setup)
74 Requests: atomic.LoadUint64(&c.stats.Requests),
75 CollectionBytes: c.collectionBytes(),
76 CollectionEntries: c.collections.Len(),
77 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
78 PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
79 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
80 APICalls: atomic.LoadUint64(&c.stats.APICalls),
84 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
85 c.setupOnce.Do(c.setup)
87 atomic.AddUint64(&c.stats.Requests, 1)
90 permKey := arv.ApiToken + "\000" + targetID
92 } else if ent, cached := c.permissions.Get(permKey); cached {
93 ent := ent.(*cachedPermission)
94 if ent.expire.Before(time.Now()) {
95 c.permissions.Remove(permKey)
98 atomic.AddUint64(&c.stats.PermissionHits, 1)
103 if arvadosclient.PDHMatch(targetID) {
105 } else if forceReload {
106 } else if ent, cached := c.pdhs.Get(targetID); cached {
107 ent := ent.(*cachedPDH)
108 if ent.expire.Before(time.Now()) {
109 c.pdhs.Remove(targetID)
112 atomic.AddUint64(&c.stats.PDHHits, 1)
116 var collection *arvados.Collection
118 collection = c.lookupCollection(pdh)
121 if collection != nil && permOK {
122 return collection, nil
123 } else if collection != nil {
124 // Ask API for current PDH for this targetID. Most
125 // likely, the cached PDH is still correct; if so,
126 // _and_ the current token has permission, we can
127 // use our cached manifest.
128 atomic.AddUint64(&c.stats.APICalls, 1)
129 var current arvados.Collection
130 err := arv.Get("collections", targetID, selectPDH, ¤t)
134 if current.PortableDataHash == pdh {
135 exp := time.Now().Add(time.Duration(c.TTL))
136 c.permissions.Add(permKey, &cachedPermission{
140 c.pdhs.Add(targetID, &cachedPDH{
145 return collection, err
147 // PDH changed, but now we know we have
148 // permission -- and maybe we already have the
149 // new PDH in the cache.
150 if coll := c.lookupCollection(current.PortableDataHash); coll != nil {
156 // Collection manifest is not cached.
157 atomic.AddUint64(&c.stats.APICalls, 1)
158 err := arv.Get("collections", targetID, nil, &collection)
162 exp := time.Now().Add(time.Duration(c.TTL))
163 c.permissions.Add(permKey, &cachedPermission{
166 c.pdhs.Add(targetID, &cachedPDH{
168 pdh: collection.PortableDataHash,
170 c.collections.Add(collection.PortableDataHash, &cachedCollection{
172 collection: collection,
174 if int64(len(collection.ManifestText)) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
175 go c.pruneCollections()
177 return collection, nil
180 // pruneCollections checks the total bytes occupied by manifest_text
181 // in the collection cache and removes old entries as needed to bring
182 // the total size down to CollectionBytes. It also deletes all expired
185 // pruneCollections does not aim to be perfectly correct when there is
186 // concurrent cache activity.
187 func (c *cache) pruneCollections() {
190 keys := c.collections.Keys()
191 entsize := make([]int, len(keys))
192 expired := make([]bool, len(keys))
193 for i, k := range keys {
194 v, ok := c.collections.Peek(k)
198 ent := v.(*cachedCollection)
199 n := len(ent.collection.ManifestText)
202 expired[i] = ent.expire.Before(now)
204 for i, k := range keys {
206 c.collections.Remove(k)
207 size -= int64(entsize[i])
210 for i, k := range keys {
211 if size <= c.MaxCollectionBytes {
215 // already removed this entry in the previous loop
218 c.collections.Remove(k)
219 size -= int64(entsize[i])
223 // collectionBytes returns the approximate memory size of the
225 func (c *cache) collectionBytes() uint64 {
227 for _, k := range c.collections.Keys() {
228 v, ok := c.collections.Peek(k)
232 size += uint64(len(v.(*cachedCollection).collection.ManifestText))
237 func (c *cache) lookupCollection(pdh string) *arvados.Collection {
240 } else if ent, cached := c.collections.Get(pdh); !cached {
243 ent := ent.(*cachedCollection)
244 if ent.expire.Before(time.Now()) {
245 c.collections.Remove(pdh)
248 atomic.AddUint64(&c.stats.CollectionHits, 1)
249 return ent.collection