9 "git.curoverse.com/arvados.git/sdk/go/arvados"
10 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11 "github.com/hashicorp/golang-lru"
16 MaxCollectionEntries int
17 MaxCollectionBytes int64
18 MaxPermissionEntries int
22 pdhs *lru.TwoQueueCache
23 collections *lru.TwoQueueCache
24 permissions *lru.TwoQueueCache
28 type cacheStats struct {
29 Requests uint64 `json:"Cache.Requests"`
30 CollectionBytes uint64 `json:"Cache.CollectionBytes"`
31 CollectionEntries int `json:"Cache.CollectionEntries"`
32 CollectionHits uint64 `json:"Cache.CollectionHits"`
33 PDHHits uint64 `json:"Cache.UUIDHits"`
34 PermissionHits uint64 `json:"Cache.PermissionHits"`
35 APICalls uint64 `json:"Cache.APICalls"`
38 type cachedPDH struct {
43 type cachedCollection struct {
45 collection map[string]interface{}
48 type cachedPermission struct {
52 func (c *cache) setup() {
54 c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
58 c.collections, err = lru.New2Q(c.MaxCollectionEntries)
62 c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
68 var selectPDH = map[string]interface{}{
69 "select": []string{"portable_data_hash"},
72 func (c *cache) Stats() cacheStats {
73 c.setupOnce.Do(c.setup)
75 Requests: atomic.LoadUint64(&c.stats.Requests),
76 CollectionBytes: c.collectionBytes(),
77 CollectionEntries: c.collections.Len(),
78 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
79 PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
80 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
81 APICalls: atomic.LoadUint64(&c.stats.APICalls),
85 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
86 c.setupOnce.Do(c.setup)
88 atomic.AddUint64(&c.stats.Requests, 1)
91 permKey := arv.ApiToken + "\000" + targetID
93 } else if ent, cached := c.permissions.Get(permKey); cached {
94 ent := ent.(*cachedPermission)
95 if ent.expire.Before(time.Now()) {
96 c.permissions.Remove(permKey)
99 atomic.AddUint64(&c.stats.PermissionHits, 1)
104 if arvadosclient.PDHMatch(targetID) {
106 } else if ent, cached := c.pdhs.Get(targetID); cached {
107 ent := ent.(*cachedPDH)
108 if ent.expire.Before(time.Now()) {
109 c.pdhs.Remove(targetID)
112 atomic.AddUint64(&c.stats.PDHHits, 1)
116 var collection map[string]interface{}
118 collection = c.lookupCollection(pdh)
121 if collection != nil && permOK {
122 return collection, nil
123 } else if collection != nil {
124 // Ask API for current PDH for this targetID. Most
125 // likely, the cached PDH is still correct; if so,
126 // _and_ the current token has permission, we can
127 // use our cached manifest.
128 atomic.AddUint64(&c.stats.APICalls, 1)
129 var current map[string]interface{}
130 err := arv.Get("collections", targetID, selectPDH, ¤t)
134 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
135 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
136 } else if checkPDH == pdh {
137 exp := time.Now().Add(time.Duration(c.TTL))
138 c.permissions.Add(permKey, &cachedPermission{
142 c.pdhs.Add(targetID, &cachedPDH{
147 return collection, err
149 // PDH changed, but now we know we have
150 // permission -- and maybe we already have the
151 // new PDH in the cache.
152 if coll := c.lookupCollection(checkPDH); coll != nil {
158 // Collection manifest is not cached.
159 atomic.AddUint64(&c.stats.APICalls, 1)
160 err := arv.Get("collections", targetID, nil, &collection)
164 pdh, ok := collection["portable_data_hash"].(string)
166 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
168 exp := time.Now().Add(time.Duration(c.TTL))
169 c.permissions.Add(permKey, &cachedPermission{
172 c.pdhs.Add(targetID, &cachedPDH{
176 c.collections.Add(pdh, &cachedCollection{
178 collection: collection,
180 if int64(len(collection["manifest_text"].(string))) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
181 go c.pruneCollections()
183 return collection, nil
186 // pruneCollections checks the total bytes occupied by manifest_text
187 // in the collection cache and removes old entries as needed to bring
188 // the total size down to CollectionBytes. It also deletes all expired
191 // pruneCollections does not aim to be perfectly correct when there is
192 // concurrent cache activity.
193 func (c *cache) pruneCollections() {
196 keys := c.collections.Keys()
197 entsize := make([]int, len(keys))
198 expired := make([]bool, len(keys))
199 for i, k := range keys {
200 v, ok := c.collections.Peek(k)
204 ent := v.(*cachedCollection)
205 n := len(ent.collection["manifest_text"].(string))
208 expired[i] = ent.expire.Before(now)
210 for i, k := range keys {
212 c.collections.Remove(k)
213 size -= int64(entsize[i])
216 for i, k := range keys {
217 if size <= c.MaxCollectionBytes {
221 // already removed this entry in the previous loop
224 c.collections.Remove(k)
225 size -= int64(entsize[i])
229 // collectionBytes returns the approximate memory size of the
231 func (c *cache) collectionBytes() uint64 {
233 for _, k := range c.collections.Keys() {
234 v, ok := c.collections.Peek(k)
238 size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
243 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
246 } else if ent, cached := c.collections.Get(pdh); !cached {
249 ent := ent.(*cachedCollection)
250 if ent.expire.Before(time.Now()) {
251 c.collections.Remove(pdh)
254 atomic.AddUint64(&c.stats.CollectionHits, 1)
255 return ent.collection