9 "git.curoverse.com/arvados.git/sdk/go/arvados"
10 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11 "github.com/hashicorp/golang-lru"
22 pdhs *lru.TwoQueueCache
23 collections *lru.TwoQueueCache
24 permissions *lru.TwoQueueCache
28 type cacheStats struct {
29 Requests uint64 `json:"Cache.Requests"`
30 CollectionBytes uint64 `json:"Cache.CollectionBytes"`
31 CollectionEntries int `json:"Cache.CollectionEntries"`
32 CollectionHits uint64 `json:"Cache.CollectionHits"`
33 PDHHits uint64 `json:"Cache.UUIDHits"`
34 PermissionHits uint64 `json:"Cache.PermissionHits"`
35 APICalls uint64 `json:"Cache.APICalls"`
38 type cachedPDH struct {
43 type cachedCollection struct {
45 collection map[string]interface{}
48 type cachedPermission struct {
52 func (c *cache) setup() {
54 c.pdhs, err = lru.New2Q(c.UUIDEntries)
58 c.collections, err = lru.New2Q(c.CollectionEntries)
62 c.permissions, err = lru.New2Q(c.PermissionEntries)
68 var selectPDH = map[string]interface{}{
69 "select": []string{"portable_data_hash"},
72 func (c *cache) Stats() cacheStats {
73 c.setupOnce.Do(c.setup)
75 Requests: atomic.LoadUint64(&c.stats.Requests),
76 CollectionBytes: c.collectionBytes(),
77 CollectionEntries: c.collections.Len(),
78 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
79 PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
80 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
81 APICalls: atomic.LoadUint64(&c.stats.APICalls),
85 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
86 c.setupOnce.Do(c.setup)
88 atomic.AddUint64(&c.stats.Requests, 1)
91 permKey := arv.ApiToken + "\000" + targetID
92 if ent, cached := c.permissions.Get(permKey); cached {
93 ent := ent.(*cachedPermission)
94 if ent.expire.Before(time.Now()) {
95 c.permissions.Remove(permKey)
98 atomic.AddUint64(&c.stats.PermissionHits, 1)
103 if arvadosclient.PDHMatch(targetID) {
105 } else if ent, cached := c.pdhs.Get(targetID); cached {
106 ent := ent.(*cachedPDH)
107 if ent.expire.Before(time.Now()) {
108 c.pdhs.Remove(targetID)
111 atomic.AddUint64(&c.stats.PDHHits, 1)
115 collection := c.lookupCollection(pdh)
117 if collection != nil && permOK && !forceReload {
118 return collection, nil
121 if collection != nil {
122 // Ask API for current PDH for this targetID. Most
123 // likely, the cached PDH is still correct; if so,
124 // _and_ the current token has permission, we can
125 // use our cached manifest.
126 atomic.AddUint64(&c.stats.APICalls, 1)
127 var current map[string]interface{}
128 err := arv.Get("collections", targetID, selectPDH, ¤t)
132 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
133 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
134 } else if checkPDH == pdh {
135 exp := time.Now().Add(time.Duration(c.TTL))
136 c.permissions.Add(permKey, &cachedPermission{
140 c.pdhs.Add(targetID, &cachedPDH{
145 return collection, err
147 // PDH changed, but now we know we have
148 // permission -- and maybe we already have the
149 // new PDH in the cache.
150 if coll := c.lookupCollection(checkPDH); coll != nil {
156 // Collection manifest is not cached.
157 atomic.AddUint64(&c.stats.APICalls, 1)
158 err := arv.Get("collections", targetID, nil, &collection)
162 pdh, ok := collection["portable_data_hash"].(string)
164 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
166 exp := time.Now().Add(time.Duration(c.TTL))
167 c.permissions.Add(permKey, &cachedPermission{
170 c.pdhs.Add(targetID, &cachedPDH{
174 c.collections.Add(pdh, &cachedCollection{
176 collection: collection,
178 if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) {
181 return collection, nil
184 // pruneCollections checks the total bytes occupied by manifest_text
185 // in the collection cache and removes old entries as needed to bring
186 // the total size down to CollectionBytes. It also deletes all expired
189 // pruneCollections does not aim to be perfectly correct when there is
190 // concurrent cache activity.
191 func (c *cache) pruneCollections() {
194 keys := c.collections.Keys()
195 entsize := make([]int, len(keys))
196 expired := make([]bool, len(keys))
197 for i, k := range keys {
198 v, ok := c.collections.Peek(k)
202 ent := v.(*cachedCollection)
203 n := len(ent.collection["manifest_text"].(string))
206 expired[i] = ent.expire.Before(now)
208 for i, k := range keys {
210 c.collections.Remove(k)
211 size -= int64(entsize[i])
214 for i, k := range keys {
215 if size <= c.CollectionBytes {
219 // already removed this entry in the previous loop
222 c.collections.Remove(k)
223 size -= int64(entsize[i])
227 // collectionBytes returns the approximate memory size of the
229 func (c *cache) collectionBytes() uint64 {
231 for _, k := range c.collections.Keys() {
232 v, ok := c.collections.Peek(k)
236 size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
241 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
244 } else if ent, cached := c.collections.Get(pdh); !cached {
247 ent := ent.(*cachedCollection)
248 if ent.expire.Before(time.Now()) {
249 c.collections.Remove(pdh)
252 atomic.AddUint64(&c.stats.CollectionHits, 1)
253 return ent.collection