11809: Cache permission and collection lookups.
authorTom Clegg <tom@curoverse.com>
Tue, 6 Jun 2017 15:57:34 +0000 (11:57 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 6 Jun 2017 15:57:34 +0000 (11:57 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

services/keep-web/cache.go [new file with mode: 0644]
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/server_test.go
services/keep-web/usage.go

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
new file mode 100644 (file)
index 0000000..e8bf399
--- /dev/null
@@ -0,0 +1,237 @@
+package main
+
+import (
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "github.com/hashicorp/golang-lru"
+)
+
+type cache struct {
+       TTL               arvados.Duration
+       CollectionEntries int
+       CollectionBytes   int64
+       PermissionEntries int
+       UUIDEntries       int
+
+       stats       cacheStats
+       pdhs        *lru.TwoQueueCache
+       collections *lru.TwoQueueCache
+       permissions *lru.TwoQueueCache
+       setupOnce   sync.Once
+}
+
+type cacheStats struct {
+       Requests       uint64
+       CollectionHits uint64
+       PDHHits        uint64
+       PermissionHits uint64
+       APICalls       uint64
+}
+
+type cachedPDH struct {
+       expire time.Time
+       pdh    string
+}
+
+type cachedCollection struct {
+       expire     time.Time
+       collection map[string]interface{}
+}
+
+type cachedPermission struct {
+       expire time.Time
+}
+
+func (c *cache) setup() {
+       var err error
+       c.pdhs, err = lru.New2Q(c.UUIDEntries)
+       if err != nil {
+               panic(err)
+       }
+       c.collections, err = lru.New2Q(c.CollectionEntries)
+       if err != nil {
+               panic(err)
+       }
+       c.permissions, err = lru.New2Q(c.PermissionEntries)
+       if err != nil {
+               panic(err)
+       }
+}
+
+var selectPDH = map[string]interface{}{
+       "select": []string{"portable_data_hash"},
+}
+
+func (c *cache) Stats() cacheStats {
+       return cacheStats{
+               Requests:       atomic.LoadUint64(&c.stats.Requests),
+               CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
+               PDHHits:        atomic.LoadUint64(&c.stats.PDHHits),
+               PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
+               APICalls:       atomic.LoadUint64(&c.stats.APICalls),
+       }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
+       c.setupOnce.Do(c.setup)
+
+       atomic.AddUint64(&c.stats.Requests, 1)
+
+       permOK := false
+       permKey := arv.ApiToken + "\000" + targetID
+       if ent, cached := c.permissions.Get(permKey); cached {
+               ent := ent.(*cachedPermission)
+               if ent.expire.Before(time.Now()) {
+                       c.permissions.Remove(permKey)
+               } else {
+                       permOK = true
+                       atomic.AddUint64(&c.stats.PermissionHits, 1)
+               }
+       }
+
+       var pdh string
+       if arvadosclient.PDHMatch(targetID) {
+               pdh = targetID
+       } else if ent, cached := c.pdhs.Get(targetID); cached {
+               ent := ent.(*cachedPDH)
+               if ent.expire.Before(time.Now()) {
+                       c.pdhs.Remove(targetID)
+               } else {
+                       pdh = ent.pdh
+                       atomic.AddUint64(&c.stats.PDHHits, 1)
+               }
+       }
+
+       collection := c.lookupCollection(pdh)
+
+       if collection != nil && permOK && !forceReload {
+               return collection, nil
+       }
+
+       if collection != nil {
+               // Ask API for current PDH for this targetID. Most
+               // likely, the cached PDH is still correct; if so,
+               // _and_ the current token has permission, we can
+               // use our cached manifest.
+               atomic.AddUint64(&c.stats.APICalls, 1)
+               var current map[string]interface{}
+               err := arv.Get("collections", targetID, selectPDH, &current)
+               if err != nil {
+                       return nil, err
+               }
+               if checkPDH, ok := current["portable_data_hash"].(string); !ok {
+                       return nil, fmt.Errorf("API response for %q had no PDH", targetID)
+               } else if checkPDH == pdh {
+                       exp := time.Now().Add(time.Duration(c.TTL))
+                       c.permissions.Add(permKey, &cachedPermission{
+                               expire: exp,
+                       })
+                       if pdh != targetID {
+                               c.pdhs.Add(targetID, &cachedPDH{
+                                       expire: exp,
+                                       pdh:    pdh,
+                               })
+                       }
+                       return collection, err
+               } else {
+                       // PDH changed, but now we know we have
+                       // permission -- and maybe we already have the
+                       // new PDH in the cache.
+                       if coll := c.lookupCollection(checkPDH); coll != nil {
+                               return coll, nil
+                       }
+               }
+       }
+
+       // Collection manifest is not cached.
+       atomic.AddUint64(&c.stats.APICalls, 1)
+       err := arv.Get("collections", targetID, nil, &collection)
+       if err != nil {
+               return nil, err
+       }
+       pdh, ok := collection["portable_data_hash"].(string)
+       if !ok {
+               return nil, fmt.Errorf("API response for %q had no PDH", targetID)
+       }
+       exp := time.Now().Add(time.Duration(c.TTL))
+       c.permissions.Add(permKey, &cachedPermission{
+               expire: exp,
+       })
+       c.pdhs.Add(targetID, &cachedPDH{
+               expire: exp,
+               pdh:    pdh,
+       })
+       c.collections.Add(pdh, &cachedCollection{
+               expire:     exp,
+               collection: collection,
+       })
+       if int64(len(collection["manifest_text"].(string))) > c.CollectionBytes/int64(c.CollectionEntries) {
+               c.pruneCollections()
+       }
+       return collection, nil
+}
+
+// pruneCollections checks the total bytes occupied by manifest_text
+// in the collection cache and removes old entries as needed to bring
+// the total size down to CollectionBytes. It also deletes all expired
+// entries.
+//
+// pruneCollections does not aim to be perfectly correct when there is
+// concurrent cache activity.
+func (c *cache) pruneCollections() {
+       var size int64
+       now := time.Now()
+       keys := c.collections.Keys()
+       entsize := make([]int, len(keys))
+       expired := make([]bool, len(keys))
+       for i, k := range keys {
+               v, ok := c.collections.Peek(k)
+               if !ok {
+                       continue
+               }
+               ent := v.(*cachedCollection)
+               n := len(ent.collection["manifest_text"].(string))
+               size += int64(n)
+               entsize[i] = n
+               expired[i] = ent.expire.Before(now)
+       }
+       for i, k := range keys {
+               if expired[i] {
+                       c.collections.Remove(k)
+                       size -= int64(entsize[i])
+               }
+       }
+       for i, k := range keys {
+               if size <= c.CollectionBytes {
+                       break
+               }
+               if expired[i] {
+                       // already removed this entry in the previous loop
+                       continue
+               }
+               c.collections.Remove(k)
+               size -= int64(entsize[i])
+       }
+}
+
+func (c *cache) lookupCollection(pdh string) map[string]interface{} {
+       if pdh == "" {
+               return nil
+       } else if ent, cached := c.collections.Get(pdh); !cached {
+               return nil
+       } else {
+               ent := ent.(*cachedCollection)
+               if ent.expire.Before(time.Now()) {
+                       c.collections.Remove(pdh)
+                       return nil
+               } else {
+                       atomic.AddUint64(&c.stats.CollectionHits, 1)
+                       return ent.collection
+               }
+       }
+}
index b7e39c6041d256188350d464da86ee39fb29bdeb..45cd27fc639f05e7dffde604f3c1417435801542 100644 (file)
@@ -275,11 +275,17 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                targetPath = targetPath[1:]
        }
 
+       forceReload := false
+       if cc := r.Header.Get("Cache-Control"); strings.Contains(cc, "no-cache") || strings.Contains(cc, "must-revalidate") {
+               forceReload = true
+       }
+
+       var collection map[string]interface{}
        tokenResult := make(map[string]int)
-       collection := make(map[string]interface{})
        found := false
        for _, arv.ApiToken = range tokens {
-               err := arv.Get("collections", targetID, nil, &collection)
+               var err error
+               collection, err = h.Config.Cache.Get(arv, targetID, forceReload)
                if err == nil {
                        // Success
                        found = true
index 57ac2190c4cfe9d3a75278cce1c38b3a282eff89..df0346ba315f420ce5aae5dd2d3a59c06e4e87fb 100644 (file)
@@ -19,7 +19,7 @@ var _ = check.Suite(&UnitSuite{})
 type UnitSuite struct{}
 
 func (s *UnitSuite) TestCORSPreflight(c *check.C) {
-       h := handler{Config: &Config{}}
+       h := handler{Config: DefaultConfig()}
        u, _ := url.Parse("http://keep-web.example/c=" + arvadostest.FooCollection + "/foo")
        req := &http.Request{
                Method:     "OPTIONS",
@@ -70,9 +70,9 @@ func (s *UnitSuite) TestInvalidUUID(c *check.C) {
                        RequestURI: u.RequestURI(),
                }
                resp := httptest.NewRecorder()
-               h := handler{Config: &Config{
-                       AnonymousTokens: []string{arvadostest.AnonymousToken},
-               }}
+               cfg := DefaultConfig()
+               cfg.AnonymousTokens = []string{arvadostest.AnonymousToken}
+               h := handler{Config: cfg}
                h.ServeHTTP(resp, req)
                c.Check(resp.Code, check.Equals, http.StatusNotFound)
        }
index 5f4cb5090468708ce02d34ec5f74d9baf80720a5..bcdcd8b6d7316c0f8fd9ecdc74f0e40d38d46679 100644 (file)
@@ -4,6 +4,7 @@ import (
        "flag"
        "log"
        "os"
+       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/config"
@@ -24,6 +25,8 @@ type Config struct {
        AttachmentOnlyHost string
        TrustAllContent    bool
 
+       Cache cache
+
        // Hack to support old command line flag, which is a bool
        // meaning "get actual token from environment".
        deprecatedAllowAnonymous bool
@@ -33,6 +36,13 @@ type Config struct {
 func DefaultConfig() *Config {
        return &Config{
                Listen: ":80",
+               Cache: cache{
+                       TTL:               arvados.Duration(5 * time.Minute),
+                       CollectionEntries: 100,
+                       CollectionBytes:   100000000,
+                       PermissionEntries: 100,
+                       UUIDEntries:       100,
+               },
        }
 }
 
index 6441364e99fcc93d4da26f4c1f6fe150c740be7b..52fe459ec43ff13c422c1679017de68246a36d1d 100644 (file)
@@ -311,13 +311,13 @@ func (s *IntegrationSuite) TearDownSuite(c *check.C) {
 
 func (s *IntegrationSuite) SetUpTest(c *check.C) {
        arvadostest.ResetEnv()
-       s.testServer = &server{Config: &Config{
-               Client: arvados.Client{
-                       APIHost:  testAPIHost,
-                       Insecure: true,
-               },
-               Listen: "127.0.0.1:0",
-       }}
+       cfg := DefaultConfig()
+       cfg.Client = arvados.Client{
+               APIHost:  testAPIHost,
+               Insecure: true,
+       }
+       cfg.Listen = "127.0.0.1:0"
+       s.testServer = &server{Config: cfg}
        err := s.testServer.Start()
        c.Assert(err, check.Equals, nil)
 }
index a36bf584a7ac70beb25ae268a46a1bab5efda6a4..b854f5314c479fe8671caa29b773255b6eefe849 100644 (file)
@@ -67,5 +67,25 @@ TrustAllContent:
     Serve non-public content from a single origin. Dangerous: read
     docs before using!
 
+Cache.TTL:
+
+    Maximum time to cache collection data and permission checks.
+
+Cache.CollectionEntries:
+
+    Maximum number of collection cache entries.
+
+Cache.CollectionBytes:
+
+    Approximate memory limit for collection cache.
+
+Cache.PermissionEntries:
+
+    Maximum number of permission cache entries.
+
+Cache.UUIDEntries:
+
+    Maximum number of UUID cache entries.
+
 `, exampleConfigFile)
 }