9 "git.curoverse.com/arvados.git/sdk/go/arvados"
10 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11 "github.com/hashicorp/golang-lru"
22 pdhs *lru.TwoQueueCache
23 collections *lru.TwoQueueCache
24 permissions *lru.TwoQueueCache
28 type cacheStats struct {
36 type cachedPDH struct {
41 type cachedCollection struct {
43 collection map[string]interface{}
46 type cachedPermission struct {
50 func (c *cache) setup() {
52 c.pdhs, err = lru.New2Q(c.UUIDEntries)
56 c.collections, err = lru.New2Q(c.CollectionEntries)
60 c.permissions, err = lru.New2Q(c.PermissionEntries)
66 var selectPDH = map[string]interface{}{
67 "select": []string{"portable_data_hash"},
70 func (c *cache) Stats() cacheStats {
72 Requests: atomic.LoadUint64(&c.stats.Requests),
73 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
74 PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
75 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
76 APICalls: atomic.LoadUint64(&c.stats.APICalls),
80 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
81 c.setupOnce.Do(c.setup)
83 atomic.AddUint64(&c.stats.Requests, 1)
86 permKey := arv.ApiToken + "\000" + targetID
87 if ent, cached := c.permissions.Get(permKey); cached {
88 ent := ent.(*cachedPermission)
89 if ent.expire.Before(time.Now()) {
90 c.permissions.Remove(permKey)
93 atomic.AddUint64(&c.stats.PermissionHits, 1)
98 if arvadosclient.PDHMatch(targetID) {
100 } else if ent, cached := c.pdhs.Get(targetID); cached {
101 ent := ent.(*cachedPDH)
102 if ent.expire.Before(time.Now()) {
103 c.pdhs.Remove(targetID)
106 atomic.AddUint64(&c.stats.PDHHits, 1)
110 collection := c.lookupCollection(pdh)
112 if collection != nil && permOK && !forceReload {
113 return collection, nil
116 if collection != nil {
117 // Ask API for current PDH for this targetID. Most
118 // likely, the cached PDH is still correct; if so,
119 // _and_ the current token has permission, we can
120 // use our cached manifest.
121 atomic.AddUint64(&c.stats.APICalls, 1)
122 var current map[string]interface{}
123 err := arv.Get("collections", targetID, selectPDH, ¤t)
127 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
128 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
129 } else if checkPDH == pdh {
130 exp := time.Now().Add(time.Duration(c.TTL))
131 c.permissions.Add(permKey, &cachedPermission{
135 c.pdhs.Add(targetID, &cachedPDH{
140 return collection, err
142 // PDH changed, but now we know we have
143 // permission -- and maybe we already have the
144 // new PDH in the cache.
145 if coll := c.lookupCollection(checkPDH); coll != nil {
151 // Collection manifest is not cached.
152 atomic.AddUint64(&c.stats.APICalls, 1)
153 err := arv.Get("collections", targetID, nil, &collection)
157 pdh, ok := collection["portable_data_hash"].(string)
159 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
161 exp := time.Now().Add(time.Duration(c.TTL))
162 c.permissions.Add(permKey, &cachedPermission{
165 c.pdhs.Add(targetID, &cachedPDH{
169 c.collections.Add(pdh, &cachedCollection{
171 collection: collection,
173 if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) {
176 return collection, nil
179 // pruneCollections checks the total bytes occupied by manifest_text
180 // in the collection cache and removes old entries as needed to bring
181 // the total size down to CollectionBytes. It also deletes all expired
184 // pruneCollections does not aim to be perfectly correct when there is
185 // concurrent cache activity.
186 func (c *cache) pruneCollections() {
189 keys := c.collections.Keys()
190 entsize := make([]int, len(keys))
191 expired := make([]bool, len(keys))
192 for i, k := range keys {
193 v, ok := c.collections.Peek(k)
197 ent := v.(*cachedCollection)
198 n := len(ent.collection["manifest_text"].(string))
201 expired[i] = ent.expire.Before(now)
203 for i, k := range keys {
205 c.collections.Remove(k)
206 size -= int64(entsize[i])
209 for i, k := range keys {
210 if size <= c.CollectionBytes {
214 // already removed this entry in the previous loop
217 c.collections.Remove(k)
218 size -= int64(entsize[i])
222 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
225 } else if ent, cached := c.collections.Get(pdh); !cached {
228 ent := ent.(*cachedCollection)
229 if ent.expire.Before(time.Now()) {
230 c.collections.Remove(pdh)
233 atomic.AddUint64(&c.stats.CollectionHits, 1)
234 return ent.collection