package main import ( "fmt" "sync" "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "github.com/hashicorp/golang-lru" ) type cache struct { TTL arvados.Duration CollectionEntries int CollectionBytes int64 PermissionEntries int UUIDEntries int stats cacheStats pdhs *lru.TwoQueueCache collections *lru.TwoQueueCache permissions *lru.TwoQueueCache setupOnce sync.Once } type cacheStats struct { Requests uint64 CollectionHits uint64 PDHHits uint64 PermissionHits uint64 APICalls uint64 } type cachedPDH struct { expire time.Time pdh string } type cachedCollection struct { expire time.Time collection map[string]interface{} } type cachedPermission struct { expire time.Time } func (c *cache) setup() { var err error c.pdhs, err = lru.New2Q(c.UUIDEntries) if err != nil { panic(err) } c.collections, err = lru.New2Q(c.CollectionEntries) if err != nil { panic(err) } c.permissions, err = lru.New2Q(c.PermissionEntries) if err != nil { panic(err) } } var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } func (c *cache) Stats() cacheStats { return cacheStats{ Requests: atomic.LoadUint64(&c.stats.Requests), CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits), PDHHits: atomic.LoadUint64(&c.stats.PDHHits), PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits), APICalls: atomic.LoadUint64(&c.stats.APICalls), } } func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) { c.setupOnce.Do(c.setup) atomic.AddUint64(&c.stats.Requests, 1) permOK := false permKey := arv.ApiToken + "\000" + targetID if ent, cached := c.permissions.Get(permKey); cached { ent := ent.(*cachedPermission) if ent.expire.Before(time.Now()) { c.permissions.Remove(permKey) } else { permOK = true atomic.AddUint64(&c.stats.PermissionHits, 1) } } var pdh string if arvadosclient.PDHMatch(targetID) { pdh = targetID } else if ent, cached := c.pdhs.Get(targetID); cached { ent := ent.(*cachedPDH) if ent.expire.Before(time.Now()) { c.pdhs.Remove(targetID) } else { pdh = ent.pdh atomic.AddUint64(&c.stats.PDHHits, 1) } } collection := c.lookupCollection(pdh) if collection != nil && permOK && !forceReload { return collection, nil } if collection != nil { // Ask API for current PDH for this targetID. Most // likely, the cached PDH is still correct; if so, // _and_ the current token has permission, we can // use our cached manifest. atomic.AddUint64(&c.stats.APICalls, 1) var current map[string]interface{} err := arv.Get("collections", targetID, selectPDH, ¤t) if err != nil { return nil, err } if checkPDH, ok := current["portable_data_hash"].(string); !ok { return nil, fmt.Errorf("API response for %q had no PDH", targetID) } else if checkPDH == pdh { exp := time.Now().Add(time.Duration(c.TTL)) c.permissions.Add(permKey, &cachedPermission{ expire: exp, }) if pdh != targetID { c.pdhs.Add(targetID, &cachedPDH{ expire: exp, pdh: pdh, }) } return collection, err } else { // PDH changed, but now we know we have // permission -- and maybe we already have the // new PDH in the cache. if coll := c.lookupCollection(checkPDH); coll != nil { return coll, nil } } } // Collection manifest is not cached. atomic.AddUint64(&c.stats.APICalls, 1) err := arv.Get("collections", targetID, nil, &collection) if err != nil { return nil, err } pdh, ok := collection["portable_data_hash"].(string) if !ok { return nil, fmt.Errorf("API response for %q had no PDH", targetID) } exp := time.Now().Add(time.Duration(c.TTL)) c.permissions.Add(permKey, &cachedPermission{ expire: exp, }) c.pdhs.Add(targetID, &cachedPDH{ expire: exp, pdh: pdh, }) c.collections.Add(pdh, &cachedCollection{ expire: exp, collection: collection, }) if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) { c.pruneCollections() } return collection, nil } // pruneCollections checks the total bytes occupied by manifest_text // in the collection cache and removes old entries as needed to bring // the total size down to CollectionBytes. It also deletes all expired // entries. // // pruneCollections does not aim to be perfectly correct when there is // concurrent cache activity. func (c *cache) pruneCollections() { var size int64 now := time.Now() keys := c.collections.Keys() entsize := make([]int, len(keys)) expired := make([]bool, len(keys)) for i, k := range keys { v, ok := c.collections.Peek(k) if !ok { continue } ent := v.(*cachedCollection) n := len(ent.collection["manifest_text"].(string)) size += int64(n) entsize[i] = n expired[i] = ent.expire.Before(now) } for i, k := range keys { if expired[i] { c.collections.Remove(k) size -= int64(entsize[i]) } } for i, k := range keys { if size <= c.CollectionBytes { break } if expired[i] { // already removed this entry in the previous loop continue } c.collections.Remove(k) size -= int64(entsize[i]) } } func (c *cache) lookupCollection(pdh string) map[string]interface{} { if pdh == "" { return nil } else if ent, cached := c.collections.Get(pdh); !cached { return nil } else { ent := ent.(*cachedCollection) if ent.expire.Before(time.Now()) { c.collections.Remove(pdh) return nil } else { atomic.AddUint64(&c.stats.CollectionHits, 1) return ent.collection } } }