9 "git.curoverse.com/arvados.git/sdk/go/arvados"
10 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11 "github.com/hashicorp/golang-lru"
16 MaxCollectionEntries int
17 MaxCollectionBytes int64
18 MaxPermissionEntries int
22 pdhs *lru.TwoQueueCache
23 collections *lru.TwoQueueCache
24 permissions *lru.TwoQueueCache
28 type cacheStats struct {
29 Requests uint64 `json:"Cache.Requests"`
30 CollectionBytes uint64 `json:"Cache.CollectionBytes"`
31 CollectionEntries int `json:"Cache.CollectionEntries"`
32 CollectionHits uint64 `json:"Cache.CollectionHits"`
33 PDHHits uint64 `json:"Cache.UUIDHits"`
34 PermissionHits uint64 `json:"Cache.PermissionHits"`
35 APICalls uint64 `json:"Cache.APICalls"`
38 type cachedPDH struct {
43 type cachedCollection struct {
45 collection map[string]interface{}
48 type cachedPermission struct {
52 func (c *cache) setup() {
54 c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
58 c.collections, err = lru.New2Q(c.MaxCollectionEntries)
62 c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
68 var selectPDH = map[string]interface{}{
69 "select": []string{"portable_data_hash"},
72 func (c *cache) Stats() cacheStats {
73 c.setupOnce.Do(c.setup)
75 Requests: atomic.LoadUint64(&c.stats.Requests),
76 CollectionBytes: c.collectionBytes(),
77 CollectionEntries: c.collections.Len(),
78 CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
79 PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
80 PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
81 APICalls: atomic.LoadUint64(&c.stats.APICalls),
85 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
86 c.setupOnce.Do(c.setup)
88 atomic.AddUint64(&c.stats.Requests, 1)
91 permKey := arv.ApiToken + "\000" + targetID
93 } else if ent, cached := c.permissions.Get(permKey); cached {
94 ent := ent.(*cachedPermission)
95 if ent.expire.Before(time.Now()) {
96 c.permissions.Remove(permKey)
99 atomic.AddUint64(&c.stats.PermissionHits, 1)
104 if arvadosclient.PDHMatch(targetID) {
106 } else if forceReload {
107 } else if ent, cached := c.pdhs.Get(targetID); cached {
108 ent := ent.(*cachedPDH)
109 if ent.expire.Before(time.Now()) {
110 c.pdhs.Remove(targetID)
113 atomic.AddUint64(&c.stats.PDHHits, 1)
117 var collection map[string]interface{}
119 collection = c.lookupCollection(pdh)
122 if collection != nil && permOK {
123 return collection, nil
124 } else if collection != nil {
125 // Ask API for current PDH for this targetID. Most
126 // likely, the cached PDH is still correct; if so,
127 // _and_ the current token has permission, we can
128 // use our cached manifest.
129 atomic.AddUint64(&c.stats.APICalls, 1)
130 var current map[string]interface{}
131 err := arv.Get("collections", targetID, selectPDH, ¤t)
135 if checkPDH, ok := current["portable_data_hash"].(string); !ok {
136 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
137 } else if checkPDH == pdh {
138 exp := time.Now().Add(time.Duration(c.TTL))
139 c.permissions.Add(permKey, &cachedPermission{
143 c.pdhs.Add(targetID, &cachedPDH{
148 return collection, err
150 // PDH changed, but now we know we have
151 // permission -- and maybe we already have the
152 // new PDH in the cache.
153 if coll := c.lookupCollection(checkPDH); coll != nil {
159 // Collection manifest is not cached.
160 atomic.AddUint64(&c.stats.APICalls, 1)
161 err := arv.Get("collections", targetID, nil, &collection)
165 pdh, ok := collection["portable_data_hash"].(string)
167 return nil, fmt.Errorf("API response for %q had no PDH", targetID)
169 exp := time.Now().Add(time.Duration(c.TTL))
170 c.permissions.Add(permKey, &cachedPermission{
173 c.pdhs.Add(targetID, &cachedPDH{
177 c.collections.Add(pdh, &cachedCollection{
179 collection: collection,
181 if int64(len(collection["manifest_text"].(string))) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
182 go c.pruneCollections()
184 return collection, nil
187 // pruneCollections checks the total bytes occupied by manifest_text
188 // in the collection cache and removes old entries as needed to bring
189 // the total size down to CollectionBytes. It also deletes all expired
192 // pruneCollections does not aim to be perfectly correct when there is
193 // concurrent cache activity.
194 func (c *cache) pruneCollections() {
197 keys := c.collections.Keys()
198 entsize := make([]int, len(keys))
199 expired := make([]bool, len(keys))
200 for i, k := range keys {
201 v, ok := c.collections.Peek(k)
205 ent := v.(*cachedCollection)
206 n := len(ent.collection["manifest_text"].(string))
209 expired[i] = ent.expire.Before(now)
211 for i, k := range keys {
213 c.collections.Remove(k)
214 size -= int64(entsize[i])
217 for i, k := range keys {
218 if size <= c.MaxCollectionBytes {
222 // already removed this entry in the previous loop
225 c.collections.Remove(k)
226 size -= int64(entsize[i])
230 // collectionBytes returns the approximate memory size of the
232 func (c *cache) collectionBytes() uint64 {
234 for _, k := range c.collections.Keys() {
235 v, ok := c.collections.Peek(k)
239 size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
244 func (c *cache) lookupCollection(pdh string) map[string]interface{} {
247 } else if ent, cached := c.collections.Get(pdh); !cached {
250 ent := ent.(*cachedCollection)
251 if ent.expire.Before(time.Now()) {
252 c.collections.Remove(pdh)
255 atomic.AddUint64(&c.stats.CollectionHits, 1)
256 return ent.collection