Merge branch '19362-all-webdav-via-sitefs'
authorTom Clegg <tom@curii.com>
Tue, 20 Sep 2022 20:09:01 +0000 (16:09 -0400)
committerTom Clegg <tom@curii.com>
Tue, 20 Sep 2022 20:09:01 +0000 (16:09 -0400)
refs #19362

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

20 files changed:
doc/admin/upgrading.html.textile.liquid
doc/api/methods/collections.html.textile.liquid
lib/config/config.default.yml
lib/config/deprecated.go
lib/config/deprecated_test.go
sdk/go/arvados/config.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_site.go
sdk/go/arvadostest/fixtures.go
services/keep-web/cache.go
services/keep-web/cache_test.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/s3.go
services/keep-web/server_test.go
services/keep-web/webdav.go
services/keepproxy/keepproxy_test.go

index d0dc7cbd87a51d1db2798f2d2b89edd5084c6b59..b034ba35d8a2306fc792f3eea5edd13d777c619e 100644 (file)
@@ -32,6 +32,12 @@ h2(#main). development main (as of 2022-08-09)
 
 "previous: Upgrading to 2.4.2":#v2_4_2
 
+h3. Renamed keep-web metrics and WebDAV configs
+
+Metrics previously reported by keep-web (@arvados_keepweb_collectioncache_requests@, @..._hits@, @..._pdh_hits@, @..._api_calls@, @..._cached_manifests@, and @arvados_keepweb_sessions_cached_collection_bytes@) have been replaced with @arvados_keepweb_cached_session_bytes@.
+
+The config entries @Collections.WebDAVCache.UUIDTTL@, @...MaxCollectionEntries@, and @...MaxUUIDEntries@ are no longer used, and should be removed from your config file.
+
 h2(#v2_4_2). v2.4.2 (2022-08-09)
 
 "previous: Upgrading to 2.4.1":#v2_4_1
index a2a6a77e19f6350c1be3c770a1560f42c9cc6402..5871337b0ae8b14760d9d32292a27f1eed29ecc8 100644 (file)
@@ -88,7 +88,7 @@ table(table table-bordered table-condensed).
 
 h3. get
 
-Gets a Collection's metadata by UUID or portable data hash.  When making a request by portable data hash, attributes other than @portable_data_hash@ and @manifest_text@ are not returned, even when requested explicitly using the @select@ parameter.
+Gets a Collection's metadata by UUID or portable data hash.  When making a request by portable data hash, attributes other than @portable_data_hash@, @manifest_text@, and @trash_at@ are not returned, even when requested explicitly using the @select@ parameter.
 
 Arguments:
 
index b23c6a12745088fc02c65cb670fc0a1936e30a45..444398bc3303d8ef3a1b50563933781649717b19 100644 (file)
@@ -611,21 +611,17 @@ Clusters:
         # Time to cache manifests, permission checks, and sessions.
         TTL: 300s
 
-        # Time to cache collection state.
-        UUIDTTL: 5s
-
         # Block cache entries. Each block consumes up to 64 MiB RAM.
         MaxBlockEntries: 20
 
-        # Collection cache entries.
-        MaxCollectionEntries: 1000
-
-        # Approximate memory limit (in bytes) for collection cache.
+        # Approximate memory limit (in bytes) for session cache.
+        #
+        # Note this applies to the in-memory representation of
+        # projects and collections -- metadata, block locators,
+        # filenames, etc. -- excluding cached file content, which is
+        # limited by MaxBlockEntries.
         MaxCollectionBytes: 100000000
 
-        # UUID cache entries.
-        MaxUUIDEntries: 1000
-
         # Persistent sessions.
         MaxSessions: 100
 
index c0a7921b36fdef66112591b18d450d11383afb50..d5c09d67061115a44cb5c8b5ef2a195fa7652734 100644 (file)
@@ -494,18 +494,9 @@ func (ldr *Loader) loadOldKeepWebConfig(cfg *arvados.Config) error {
        if oc.Cache.TTL != nil {
                cluster.Collections.WebDAVCache.TTL = *oc.Cache.TTL
        }
-       if oc.Cache.UUIDTTL != nil {
-               cluster.Collections.WebDAVCache.UUIDTTL = *oc.Cache.UUIDTTL
-       }
-       if oc.Cache.MaxCollectionEntries != nil {
-               cluster.Collections.WebDAVCache.MaxCollectionEntries = *oc.Cache.MaxCollectionEntries
-       }
        if oc.Cache.MaxCollectionBytes != nil {
                cluster.Collections.WebDAVCache.MaxCollectionBytes = *oc.Cache.MaxCollectionBytes
        }
-       if oc.Cache.MaxUUIDEntries != nil {
-               cluster.Collections.WebDAVCache.MaxUUIDEntries = *oc.Cache.MaxUUIDEntries
-       }
        if oc.AnonymousTokens != nil {
                if len(*oc.AnonymousTokens) > 0 {
                        cluster.Users.AnonymousUserToken = (*oc.AnonymousTokens)[0]
index 4206ef57717eebc494cd3593bdf2551cf2956178..f9b1d1661b1f3c16b745c7e7c6e9304f060e2c64 100644 (file)
@@ -199,10 +199,7 @@ func (s *LoadSuite) TestLegacyKeepWebConfig(c *check.C) {
        c.Check(cluster.SystemRootToken, check.Equals, "abcdefg")
 
        c.Check(cluster.Collections.WebDAVCache.TTL, check.Equals, arvados.Duration(60*time.Second))
-       c.Check(cluster.Collections.WebDAVCache.UUIDTTL, check.Equals, arvados.Duration(time.Second))
-       c.Check(cluster.Collections.WebDAVCache.MaxCollectionEntries, check.Equals, 42)
        c.Check(cluster.Collections.WebDAVCache.MaxCollectionBytes, check.Equals, int64(1234567890))
-       c.Check(cluster.Collections.WebDAVCache.MaxUUIDEntries, check.Equals, 100)
 
        c.Check(cluster.Services.WebDAVDownload.ExternalURL, check.Equals, arvados.URL{Host: "download.example.com", Path: "/"})
        c.Check(cluster.Services.WebDAVDownload.InternalURLs[arvados.URL{Host: ":80"}], check.NotNil)
index 6d8f39dfb316fbaba1cf64f71ef5f5f778f91e8e..eb564cb6102f2f16249bd37e8ac0ecbfe5bf579c 100644 (file)
@@ -61,13 +61,10 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
 }
 
 type WebDAVCacheConfig struct {
-       TTL                  Duration
-       UUIDTTL              Duration
-       MaxBlockEntries      int
-       MaxCollectionEntries int
-       MaxCollectionBytes   int64
-       MaxUUIDEntries       int
-       MaxSessions          int
+       TTL                Duration
+       MaxBlockEntries    int
+       MaxCollectionBytes int64
+       MaxSessions        int
 }
 
 type UploadDownloadPermission struct {
index 6da639edaf73c3731501246de556f1c7c1932e16..5569554ab883b2b9f31748bf983f592b43f96b65 100644 (file)
@@ -641,7 +641,15 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
        }
        locked := map[sync.Locker]bool{}
        for i := len(needLock) - 1; i >= 0; i-- {
-               if n := needLock[i]; !locked[n] {
+               n := needLock[i]
+               if fs, ok := n.(FileSystem); ok {
+                       // Lock the fs's root dir directly, not
+                       // through the fs. Otherwise our "locked" map
+                       // would not reliably prevent double-locking
+                       // the fs's root dir.
+                       n = fs.rootnode()
+               }
+               if !locked[n] {
                        n.Lock()
                        defer n.Unlock()
                        locked[n] = true
index f09c60a57192a7e654c03a63407045ac302c6d65..a26c876b932304ab6fdfefbafe36145665cbac90 100644 (file)
@@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error {
        return ErrInvalidOperation
 }
 
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
        if fs.uuid == "" && fs.savedPDH.Load() == "" {
-               return nil
+               return false, nil
        }
 
-       // First try UUID if any, then last known PDH. Stop if all
-       // signatures are new enough.
-       checkingAll := false
-       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
-               if id == "" {
-                       continue
-               }
-
-               fs.lockCheckChanges.Lock()
-               if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
-                       fs.lockCheckChanges.Unlock()
-                       return nil
-               }
-               remain, ttl := fs.signatureTimeLeft()
-               if remain > 0.01 && !checkingAll {
-                       fs.holdCheckChanges = time.Now().Add(ttl / 100)
-               }
+       fs.lockCheckChanges.Lock()
+       if !force && fs.holdCheckChanges.After(time.Now()) {
                fs.lockCheckChanges.Unlock()
+               return false, nil
+       }
+       remain, ttl := fs.signatureTimeLeft()
+       if remain > 0.01 {
+               fs.holdCheckChanges = time.Now().Add(ttl / 100)
+       }
+       fs.lockCheckChanges.Unlock()
 
-               if remain >= 0.5 {
-                       break
+       if !force && remain >= 0.5 {
+               // plenty of time left on current signatures
+               return false, nil
+       }
+
+       getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+       if fs.uuid != "" {
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+               if err != nil {
+                       return false, err
                }
-               checkingAll = true
+               if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+                       // collection has changed upstream since we
+                       // last loaded or saved. Refresh local data,
+                       // losing any unsaved local changes.
+                       newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+                       if err != nil {
+                               return false, err
+                       }
+                       snap, err := Snapshot(newfs, "/")
+                       if err != nil {
+                               return false, err
+                       }
+                       err = Splice(fs, "/", snap)
+                       if err != nil {
+                               return false, err
+                       }
+                       fs.savedPDH.Store(coll.PortableDataHash)
+                       return true, nil
+               }
+               fs.updateSignatures(coll.ManifestText)
+               return false, nil
+       }
+       if pdh := fs.savedPDH.Load().(string); pdh != "" {
                var coll Collection
-               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
                if err != nil {
-                       continue
+                       return false, err
                }
                fs.updateSignatures(coll.ManifestText)
        }
-       return nil
+       return false, nil
 }
 
 // Refresh signature on a single locator, if necessary. Assume caller
@@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
        if err != nil || exp.Sub(time.Now()) > time.Minute {
                // Synchronous update is not needed. Start an
                // asynchronous update if needed.
-               go fs.checkChangesOnServer()
+               go fs.checkChangesOnServer(false)
                return locator
        }
        var manifests string
@@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       err := fs.checkChangesOnServer()
+       refreshed, err := fs.checkChangesOnServer(true)
        if err != nil {
                return err
        }
-       if fs.uuid == "" {
+       if refreshed || fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
@@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error {
                "select": selectFields,
        })
        if err != nil {
-               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+               return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
        }
        fs.updateSignatures(coll.ManifestText)
        fs.savedPDH.Store(coll.PortableDataHash)
@@ -579,10 +605,7 @@ func (fn *filenode) MemorySize() (size int64) {
        defer fn.RUnlock()
        size = 64
        for _, seg := range fn.segments {
-               size += 64
-               if seg, ok := seg.(*memSegment); ok {
-                       size += int64(seg.Len())
-               }
+               size += seg.memorySize()
        }
        return
 }
@@ -1629,6 +1652,7 @@ type segment interface {
        // Return a new segment with a subsection of the data from this
        // one. length<0 means length=Len()-off.
        Slice(off int, length int) segment
+       memorySize() int64
 }
 
 type memSegment struct {
@@ -1707,6 +1731,10 @@ func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
        return
 }
 
+func (me *memSegment) memorySize() int64 {
+       return 64 + int64(len(me.buf))
+}
+
 type storedSegment struct {
        kc      fsBackend
        locator string
@@ -1744,6 +1772,10 @@ func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
        return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
+func (se storedSegment) memorySize() int64 {
+       return 64 + int64(len(se.locator))
+}
+
 func canonicalName(name string) string {
        name = path.Clean("/" + name)
        if name == "/" || name == "./" {
index c2cac3c6ce2e963b36b7654729e56524ba9bc2db..73689e4eadf620f12babdb3ba61979e05995d387 100644 (file)
@@ -1209,11 +1209,12 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        }
 
        nDirs := int64(8)
+       nFiles := int64(67)
        megabyte := make([]byte, 1<<20)
        for i := int64(0); i < nDirs; i++ {
                dir := fmt.Sprintf("dir%d", i)
                fs.Mkdir(dir, 0755)
-               for j := 0; j < 67; j++ {
+               for j := int64(0); j < nFiles; j++ {
                        f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
@@ -1221,8 +1222,8 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
-       inodebytes := int64((nDirs*(67*2+1) + 1) * 64)
-       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes)
+       inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
+       c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
        c.Check(flushed, check.Equals, int64(0))
 
        waitForFlush := func(expectUnflushed, expectFlushed int64) {
@@ -1233,27 +1234,29 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        }
 
        // Nothing flushed yet
-       waitForFlush((nDirs*67)<<20+inodebytes, 0)
+       waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
 
        // Flushing a non-empty dir "/" is non-recursive and there are
        // no top-level files, so this has no effect
        fs.Flush("/", false)
-       waitForFlush((nDirs*67)<<20+inodebytes, 0)
+       waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
 
        // Flush the full block in dir0
        fs.Flush("dir0", false)
-       waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20)
+       bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+       waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
 
        err = fs.Flush("dir-does-not-exist", false)
        c.Check(err, check.NotNil)
 
        // Flush full blocks in all dirs
        fs.Flush("", false)
-       waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20)
+       waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
 
        // Flush non-full blocks, too
        fs.Flush("", true)
-       waitForFlush(inodebytes, nDirs*67<<20)
+       smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+       waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
 }
 
 // Even when writing lots of files/dirs from different goroutines, as
index bb2eee77925fd2c682c7d42e1e8e175c4f2f1489..531d7968abab360df333619ce5f300f1a809209d 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "net/http"
        "os"
        "strings"
        "sync"
@@ -136,29 +137,43 @@ func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.
        return nil, ErrInvalidOperation
 }
 
-func (fs *customFileSystem) mountByID(parent inode, id string) inode {
+func (fs *customFileSystem) mountByID(parent inode, id string) (inode, error) {
        if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) {
                return fs.mountCollection(parent, id)
        } else if strings.Contains(id, "-j7d0g-") {
-               return fs.newProjectNode(fs.root, id, id, nil)
+               return fs.newProjectNode(fs.root, id, id, nil), nil
        } else {
-               return nil
+               return nil, nil
        }
 }
 
-func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
+func (fs *customFileSystem) mountCollection(parent inode, id string) (inode, error) {
        var coll Collection
        err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
-       if err != nil {
-               return nil
+       if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+               return nil, nil
+       } else if err != nil {
+               return nil, err
+       }
+       if len(id) != 27 {
+               // This means id is a PDH, and controller/railsapi
+               // returned one of (possibly) many collections with
+               // that PDH. Even if controller returns more fields
+               // besides PDH and manifest text (which are equal for
+               // all matching collections), we don't want to expose
+               // them (e.g., through Sys()).
+               coll = Collection{
+                       PortableDataHash: coll.PortableDataHash,
+                       ManifestText:     coll.ManifestText,
+               }
        }
        newfs, err := coll.FileSystem(fs, fs)
        if err != nil {
-               return nil
+               return nil, err
        }
        cfs := newfs.(*collectionFileSystem)
        cfs.SetParent(parent, id)
-       return cfs
+       return cfs, nil
 }
 
 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *Group) inode {
@@ -202,15 +217,19 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *
 // treenode, or nil for ENOENT.
 type vdirnode struct {
        treenode
-       create func(parent inode, name string) inode
+       create func(parent inode, name string) (inode, error)
 }
 
 func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
        return vn.treenode.Child(name, func(existing inode) (inode, error) {
                if existing == nil && vn.create != nil {
-                       existing = vn.create(vn, name)
-                       if existing != nil {
-                               existing.SetParent(vn, name)
+                       newnode, err := vn.create(vn, name)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if newnode != nil {
+                               newnode.SetParent(vn, name)
+                               existing = newnode
                                vn.treenode.fileinfo.modTime = time.Now()
                        }
                }
index ec55725412c381714014a09931441855f9393466..ac12f7ae13e93405b37a6814ed4e16bbda2b911e 100644 (file)
@@ -32,6 +32,7 @@ const (
        HelloWorldPdh           = "55713e6a34081eb03609e7ad5fcad129+62"
 
        MultilevelCollection1                        = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       MultilevelCollection1PDH                     = "f9ddda46bb293b6847da984e3aa735db+290"
        StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
        StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
        EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
@@ -83,8 +84,11 @@ const (
        Repository2UUID = "zzzzz-s0uqq-382brsig8rp3667"
        Repository2Name = "active/foo2"
 
-       FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
-       FooCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+       FooFileCollectionUUID             = "zzzzz-4zz18-znfnqtbbv4spc3w"
+       FooFileCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
+       FooFileCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+       BarFileCollectionUUID             = "zzzzz-4zz18-ehbhgtheo8909or"
+       BarFileCollectionPDH              = "fa7aeb5140e2848d39b416daeef4ffc5+45"
 
        WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
 
index 7ec8639abaa2cf1f99b22dd8d37cbb787475bcfe..db06d635092a3ee36daed2d3b72fe1b1b738a909 100644 (file)
@@ -5,6 +5,7 @@
 package keepweb
 
 import (
+       "net/http"
        "sync"
        "sync/atomic"
        "time"
@@ -20,74 +21,32 @@ import (
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       cluster     *arvados.Cluster
-       logger      logrus.FieldLogger
-       registry    *prometheus.Registry
-       metrics     cacheMetrics
-       pdhs        *lru.TwoQueueCache
-       collections *lru.TwoQueueCache
-       sessions    *lru.TwoQueueCache
-       setupOnce   sync.Once
+       cluster   *arvados.Cluster
+       logger    logrus.FieldLogger
+       registry  *prometheus.Registry
+       metrics   cacheMetrics
+       sessions  *lru.TwoQueueCache
+       setupOnce sync.Once
 
-       chPruneSessions    chan struct{}
-       chPruneCollections chan struct{}
+       chPruneSessions chan struct{}
 }
 
 type cacheMetrics struct {
-       requests          prometheus.Counter
-       collectionBytes   prometheus.Gauge
-       collectionEntries prometheus.Gauge
-       sessionEntries    prometheus.Gauge
-       collectionHits    prometheus.Counter
-       pdhHits           prometheus.Counter
-       sessionHits       prometheus.Counter
-       sessionMisses     prometheus.Counter
-       apiCalls          prometheus.Counter
+       requests        prometheus.Counter
+       collectionBytes prometheus.Gauge
+       sessionEntries  prometheus.Gauge
+       sessionHits     prometheus.Counter
+       sessionMisses   prometheus.Counter
 }
 
 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
-       m.requests = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "requests",
-               Help:      "Number of targetID-to-manifest lookups handled.",
-       })
-       reg.MustRegister(m.requests)
-       m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "hits",
-               Help:      "Number of pdh-to-manifest cache hits.",
-       })
-       reg.MustRegister(m.collectionHits)
-       m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "pdh_hits",
-               Help:      "Number of uuid-to-pdh cache hits.",
-       })
-       reg.MustRegister(m.pdhHits)
-       m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "api_calls",
-               Help:      "Number of outgoing API calls made by cache.",
-       })
-       reg.MustRegister(m.apiCalls)
        m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
-               Name:      "cached_collection_bytes",
-               Help:      "Total size of all cached manifests and sessions.",
+               Name:      "cached_session_bytes",
+               Help:      "Total size of all cached sessions.",
        })
        reg.MustRegister(m.collectionBytes)
-       m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "cached_manifests",
-               Help:      "Number of manifests in cache.",
-       })
-       reg.MustRegister(m.collectionEntries)
        m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
@@ -111,21 +70,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
        reg.MustRegister(m.sessionMisses)
 }
 
-type cachedPDH struct {
-       expire  time.Time
-       refresh time.Time
-       pdh     string
-}
-
-type cachedCollection struct {
-       expire     time.Time
-       collection *arvados.Collection
-}
-
-type cachedPermission struct {
-       expire time.Time
-}
-
 type cachedSession struct {
        expire        time.Time
        fs            atomic.Value
@@ -137,14 +81,6 @@ type cachedSession struct {
 
 func (c *cache) setup() {
        var err error
-       c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
-       if err != nil {
-               panic(err)
-       }
-       c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
-       if err != nil {
-               panic(err)
-       }
        c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
        if err != nil {
                panic(err)
@@ -160,12 +96,6 @@ func (c *cache) setup() {
                        c.updateGauges()
                }
        }()
-       c.chPruneCollections = make(chan struct{}, 1)
-       go func() {
-               for range c.chPruneCollections {
-                       c.pruneCollections()
-               }
-       }()
        c.chPruneSessions = make(chan struct{}, 1)
        go func() {
                for range c.chPruneSessions {
@@ -176,7 +106,6 @@ func (c *cache) setup() {
 
 func (c *cache) updateGauges() {
        c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
-       c.metrics.collectionEntries.Set(float64(c.collections.Len()))
        c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
@@ -184,39 +113,6 @@ var selectPDH = map[string]interface{}{
        "select": []string{"portable_data_hash"},
 }
 
-// Update saves a modified version (fs) to an existing collection
-// (coll) and, if successful, updates the relevant cache entries so
-// subsequent calls to Get() reflect the modifications.
-func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
-       c.setupOnce.Do(c.setup)
-
-       m, err := fs.MarshalManifest(".")
-       if err != nil || m == coll.ManifestText {
-               return err
-       }
-       coll.ManifestText = m
-       var updated arvados.Collection
-       err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
-               "collection": map[string]string{
-                       "manifest_text": coll.ManifestText,
-               },
-       })
-       if err != nil {
-               c.pdhs.Remove(coll.UUID)
-               return err
-       }
-       c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
-               expire:     time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               collection: &updated,
-       })
-       c.pdhs.Add(coll.UUID, &cachedPDH{
-               expire:  time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-               pdh:     updated.PortableDataHash,
-       })
-       return nil
-}
-
 // ResetSession unloads any potentially stale state. Should be called
 // after write operations, so subsequent reads don't return stale
 // data.
@@ -227,7 +123,7 @@ func (c *cache) ResetSession(token string) {
 
 // Get a long-lived CustomFileSystem suitable for doing a read operation
 // with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
        c.setupOnce.Do(c.setup)
        now := time.Now()
        ent, _ := c.sessions.Get(token)
@@ -241,12 +137,12 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
                var err error
                sess.client, err = arvados.NewClientFromConfig(c.cluster)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                sess.client.AuthToken = token
                sess.arvadosclient, err = arvadosclient.New(sess.client)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                sess.keepclient = keepclient.New(sess.arvadosclient)
                c.sessions.Add(token, sess)
@@ -260,14 +156,29 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
        case c.chPruneSessions <- struct{}{}:
        default:
        }
+
        fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-       if fs != nil && !expired {
-               return fs, sess, nil
+       if fs == nil || expired {
+               fs = sess.client.SiteFileSystem(sess.keepclient)
+               fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+               sess.fs.Store(fs)
        }
-       fs = sess.client.SiteFileSystem(sess.keepclient)
-       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
-       sess.fs.Store(fs)
-       return fs, sess, nil
+
+       user, _ := sess.user.Load().(*arvados.User)
+       if user == nil || expired {
+               user = new(arvados.User)
+               err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
+               if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
+                       // token is OK, but "get user id" api is out
+                       // of scope -- return nil, signifying unknown
+                       // user
+               } else if err != nil {
+                       return nil, nil, nil, err
+               }
+               sess.user.Store(user)
+       }
+
+       return fs, sess, user, nil
 }
 
 // Remove all expired session cache entries, then remove more entries
@@ -294,7 +205,7 @@ func (c *cache) pruneSessions() {
        }
        // Remove tokens until reaching size limit, starting with the
        // least frequently used entries (which Keys() returns last).
-       for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- {
+       for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
                if sizes[i] > 0 {
                        c.sessions.Remove(keys[i])
                        size -= sizes[i]
@@ -302,147 +213,10 @@ func (c *cache) pruneSessions() {
        }
 }
 
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
-       c.setupOnce.Do(c.setup)
-       c.metrics.requests.Inc()
-
-       var pdhRefresh bool
-       var pdh string
-       if arvadosclient.PDHMatch(targetID) {
-               pdh = targetID
-       } else if ent, cached := c.pdhs.Get(targetID); cached {
-               ent := ent.(*cachedPDH)
-               if ent.expire.Before(time.Now()) {
-                       c.pdhs.Remove(targetID)
-               } else {
-                       pdh = ent.pdh
-                       pdhRefresh = forceReload || time.Now().After(ent.refresh)
-                       c.metrics.pdhHits.Inc()
-               }
-       }
-
-       if pdh == "" {
-               // UUID->PDH mapping is not cached, might as well get
-               // the whole collection record and be done (below).
-               c.logger.Debugf("cache(%s): have no pdh", targetID)
-       } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
-               // PDH->manifest is not cached, might as well get the
-               // whole collection record (below).
-               c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
-       } else if !pdhRefresh {
-               // We looked up UUID->PDH very recently, and we still
-               // have the manifest for that PDH.
-               c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
-               return cached, nil
-       } else {
-               // Get current PDH for this UUID (and confirm we still
-               // have read permission).  Most likely, the cached PDH
-               // is still correct, in which case we can use our
-               // cached manifest.
-               c.metrics.apiCalls.Inc()
-               var current arvados.Collection
-               err := arv.Get("collections", targetID, selectPDH, &current)
-               if err != nil {
-                       return nil, err
-               }
-               if current.PortableDataHash == pdh {
-                       // PDH has not changed, cached manifest is
-                       // correct.
-                       c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
-                       return cached, nil
-               }
-               if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
-                       // PDH changed, and we already have the
-                       // manifest for that new PDH.
-                       c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
-                       return cached, nil
-               }
-       }
-
-       // Either UUID->PDH is not cached, or PDH->manifest is not
-       // cached.
-       var retrieved arvados.Collection
-       c.metrics.apiCalls.Inc()
-       err := arv.Get("collections", targetID, nil, &retrieved)
-       if err != nil {
-               return nil, err
-       }
-       c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
-       exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
-       if targetID != retrieved.PortableDataHash {
-               c.pdhs.Add(targetID, &cachedPDH{
-                       expire:  exp,
-                       refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-                       pdh:     retrieved.PortableDataHash,
-               })
-       }
-       c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
-               expire:     exp,
-               collection: &retrieved,
-       })
-       if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
-               select {
-               case c.chPruneCollections <- struct{}{}:
-               default:
-               }
-       }
-       return &retrieved, nil
-}
-
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
-       var size int64
-       now := time.Now()
-       keys := c.collections.Keys()
-       entsize := make([]int, len(keys))
-       expired := make([]bool, len(keys))
-       for i, k := range keys {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
-               }
-               ent := v.(*cachedCollection)
-               n := len(ent.collection.ManifestText)
-               size += int64(n)
-               entsize[i] = n
-               expired[i] = ent.expire.Before(now)
-       }
-       for i, k := range keys {
-               if expired[i] {
-                       c.collections.Remove(k)
-                       size -= int64(entsize[i])
-               }
-       }
-       for i, k := range keys {
-               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
-                       break
-               }
-               if expired[i] {
-                       // already removed this entry in the previous loop
-                       continue
-               }
-               c.collections.Remove(k)
-               size -= int64(entsize[i])
-       }
-}
-
 // collectionBytes returns the approximate combined memory size of the
 // collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
-       for _, k := range c.collections.Keys() {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
-               }
-               size += uint64(len(v.(*cachedCollection).collection.ManifestText))
-       }
        for _, token := range c.sessions.Keys() {
                ent, ok := c.sessions.Peek(token)
                if !ok {
@@ -454,49 +228,3 @@ func (c *cache) collectionBytes() uint64 {
        }
        return size
 }
-
-func (c *cache) lookupCollection(key string) *arvados.Collection {
-       e, cached := c.collections.Get(key)
-       if !cached {
-               return nil
-       }
-       ent := e.(*cachedCollection)
-       if ent.expire.Before(time.Now()) {
-               c.collections.Remove(key)
-               return nil
-       }
-       c.metrics.collectionHits.Inc()
-       return ent.collection
-}
-
-func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
-       // Get and cache user record associated with this
-       // token.  We need to know their UUID for logging, and
-       // whether they are an admin or not for certain
-       // permission checks.
-
-       // Get/create session entry
-       _, sess, err := c.GetSession(token)
-       if err != nil {
-               return nil, err
-       }
-
-       // See if the user is already set, and if so, return it
-       user, _ := sess.user.Load().(*arvados.User)
-       if user != nil {
-               return user, nil
-       }
-
-       // Fetch the user record
-       c.metrics.apiCalls.Inc()
-       var current arvados.User
-
-       err = sess.client.RequestAndDecode(&current, "GET", "arvados/v1/users/current", nil, nil)
-       if err != nil {
-               return nil, err
-       }
-
-       // Stash the user record for next time
-       sess.user.Store(&current)
-       return &current, nil
-}
index 6b8f427171ef7813f8453c0b23ed968229295fb4..010e29a0b876105d49756d736adefd316878a12e 100644 (file)
@@ -6,17 +6,21 @@ package keepweb
 
 import (
        "bytes"
+       "net/http"
+       "net/http/httptest"
+       "regexp"
+       "strings"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/common/expfmt"
        "gopkg.in/check.v1"
 )
 
-func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs ...string) {
+func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) {
+       s.handler.Cache.updateGauges()
+       reg := s.handler.Cache.registry
        mfs, err := reg.Gather()
        c.Check(err, check.IsNil)
        buf := &bytes.Buffer{}
@@ -25,131 +29,139 @@ func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs
                c.Check(enc.Encode(mf), check.IsNil)
        }
        mm := buf.String()
+       // Remove comments to make the "value vs. regexp" failure
+       // output easier to read.
+       mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "")
        for _, reg := range regs {
-               c.Check(mm, check.Matches, `(?ms).*collectioncache_`+reg+`\n.*`)
+               c.Check(mm, check.Matches, `(?ms).*keepweb_sessions_`+reg+`\n.*`)
        }
 }
 
-func (s *UnitSuite) TestCache(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
-       }
-
+func (s *IntegrationSuite) TestCache(c *check.C) {
        // Hit the same collection 5 times using the same token. Only
        // the first req should cause an API call; the next 4 should
        // hit all caches.
-       arv.ApiToken = arvadostest.AdminToken
-       var coll *arvados.Collection
+       u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
+       req := &http.Request{
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header: http.Header{
+                       "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               },
+       }
        for i := 0; i < 5; i++ {
-               coll, err = cache.Get(arv, arvadostest.FooCollection, false)
-               c.Check(err, check.Equals, nil)
-               c.Assert(coll, check.NotNil)
-               c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-               c.Check(coll.ManifestText[:2], check.Equals, ". ")
+               resp := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
        }
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 5",
+       s.checkCacheMetrics(c,
                "hits 4",
-               "pdh_hits 4",
-               "api_calls 1")
-
-       // Hit the same collection 2 more times, this time requesting
-       // it by PDH and using a different token. The first req should
-       // miss the permission cache and fetch the new manifest; the
-       // second should hit the Collection cache and skip the API
-       // lookup.
-       arv.ApiToken = arvadostest.ActiveToken
-
-       coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false)
-       c.Check(err, check.Equals, nil)
-       c.Assert(coll2, check.NotNil)
-       c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-       c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-       c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText)
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 6",
-               "hits 4",
-               "pdh_hits 4",
-               "api_calls 2")
-
-       coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false)
-       c.Check(err, check.Equals, nil)
-       c.Assert(coll2, check.NotNil)
-       c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-       c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 7",
-               "hits 5",
-               "pdh_hits 4",
-               "api_calls 2")
-
-       // Alternating between two collections N times should produce
-       // only 2 more API calls.
-       arv.ApiToken = arvadostest.AdminToken
-       for i := 0; i < 20; i++ {
-               var target string
-               if i%2 == 0 {
-                       target = arvadostest.HelloWorldCollection
-               } else {
-                       target = arvadostest.FooBarDirCollection
-               }
-               _, err := cache.Get(arv, target, false)
-               c.Check(err, check.Equals, nil)
+               "misses 1",
+               "active 1")
+
+       // Hit a shared collection 3 times using PDH, using a
+       // different token.
+       u2 := mustParseURL("http://" + strings.Replace(arvadostest.BarFileCollectionPDH, "+", "-", 1) + ".keep-web.example/bar")
+       req2 := &http.Request{
+               Method:     "GET",
+               Host:       u2.Host,
+               URL:        u2,
+               RequestURI: u2.RequestURI(),
+               Header: http.Header{
+                       "Authorization": {"Bearer " + arvadostest.SpectatorToken},
+               },
        }
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 27",
-               "hits 23",
-               "pdh_hits 22",
-               "api_calls 4")
-}
-
-func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
+       for i := 0; i < 3; i++ {
+               resp2 := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp2, req2)
+               c.Check(resp2.Code, check.Equals, http.StatusOK)
        }
-
-       for _, forceReload := range []bool{false, true, false, true} {
-               _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload)
-               c.Check(err, check.Equals, nil)
+       s.checkCacheMetrics(c,
+               "hits 6",
+               "misses 2",
+               "active 2")
+
+       // Alternating between two collections/tokens N times should
+       // use the existing sessions.
+       for i := 0; i < 7; i++ {
+               resp := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+
+               resp2 := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp2, req2)
+               c.Check(resp2.Code, check.Equals, http.StatusOK)
        }
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 4",
-               "hits 3",
-               "pdh_hits 0",
-               "api_calls 1")
+       s.checkCacheMetrics(c,
+               "hits 20",
+               "misses 2",
+               "active 2")
 }
 
-func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
-       }
-
-       for _, forceReload := range []bool{false, true, false, true} {
-               _, err := cache.Get(arv, arvadostest.FooCollection, forceReload)
-               c.Check(err, check.Equals, nil)
-       }
+func (s *IntegrationSuite) TestForceReloadPDH(c *check.C) {
+       filename := strings.Replace(time.Now().Format(time.RFC3339Nano), ":", ".", -1)
+       manifest := ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:" + filename + "\n"
+       pdh := arvados.PortableDataHash(manifest)
+       client := arvados.NewClientFromEnv()
+       client.AuthToken = arvadostest.ActiveToken
+
+       _, resp := s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+
+       var coll arvados.Collection
+       err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": manifest,
+               },
+       })
+       c.Assert(err, check.IsNil)
+       defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(coll.PortableDataHash, check.Equals, pdh)
+
+       _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+
+       _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/missingfile", "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+}
 
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 4",
-               "hits 3",
-               "pdh_hits 3",
-               "api_calls 3")
+func (s *IntegrationSuite) TestForceReloadUUID(c *check.C) {
+       client := arvados.NewClientFromEnv()
+       client.AuthToken = arvadostest.ActiveToken
+       var coll arvados.Collection
+       err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile\n",
+               },
+       })
+       c.Assert(err, check.IsNil)
+       defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+
+       _, resp := s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/oldfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       err = client.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile 0:0:newfile\n",
+               },
+       })
+       c.Assert(err, check.IsNil)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusOK)
 }
index 0c75ac56cfc4d69d5845ec332196942d5998cdf7..0e964e463248ff73ab5aaec0ec53d43581236a77 100644 (file)
@@ -14,12 +14,12 @@ import (
        "net/http"
        "net/url"
        "os"
-       "path/filepath"
        "sort"
        "strconv"
        "strings"
        "sync"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/auth"
@@ -31,17 +31,16 @@ import (
 )
 
 type handler struct {
-       Cache      cache
-       Cluster    *arvados.Cluster
-       clientPool *arvadosclient.ClientPool
-       setupOnce  sync.Once
-       webdavLS   webdav.LockSystem
+       Cache     cache
+       Cluster   *arvados.Cluster
+       setupOnce sync.Once
+       webdavLS  webdav.LockSystem
 }
 
 var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
 
-var notFoundMessage = "404 Not found\r\n\r\nThe requested path was not found, or you do not have permission to access it.\r"
-var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r"
+var notFoundMessage = "Not Found"
+var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r\n"
 
 // parseCollectionIDFromURL returns a UUID or PDH if s is a UUID or a
 // PDH (even if it is a PDH with "+" replaced by " " or "-");
@@ -57,10 +56,6 @@ func parseCollectionIDFromURL(s string) string {
 }
 
 func (h *handler) setup() {
-       // Errors will be handled at the client pool.
-       arv, _ := arvados.NewClientFromConfig(h.Cluster)
-       h.clientPool = arvadosclient.MakeClientPoolWith(arv)
-
        keepclient.DefaultBlockCache.MaxBlocks = h.Cluster.Collections.WebDAVCache.MaxBlockEntries
 
        // Even though we don't accept LOCK requests, every webdav
@@ -69,14 +64,15 @@ func (h *handler) setup() {
 }
 
 func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
-       json.NewEncoder(w).Encode(struct{ Version string }{version})
+       json.NewEncoder(w).Encode(struct{ Version string }{cmd.Version.String()})
 }
 
 // updateOnSuccess wraps httpserver.ResponseWriter. If the handler
 // sends an HTTP header indicating success, updateOnSuccess first
-// calls the provided update func. If the update func fails, a 500
-// response is sent, and the status code and body sent by the handler
-// are ignored (all response writes return the update error).
+// calls the provided update func. If the update func fails, an error
+// response is sent (using the error's HTTP status or 500 if none),
+// and the status code and body sent by the handler are ignored (all
+// response writes return the update error).
 type updateOnSuccess struct {
        httpserver.ResponseWriter
        logger     logrus.FieldLogger
@@ -101,10 +97,11 @@ func (uos *updateOnSuccess) WriteHeader(code int) {
                if code >= 200 && code < 400 {
                        if uos.err = uos.update(); uos.err != nil {
                                code := http.StatusInternalServerError
-                               if err, ok := uos.err.(*arvados.TransactionError); ok {
-                                       code = err.StatusCode
+                               var he interface{ HTTPStatus() int }
+                               if errors.As(uos.err, &he) {
+                                       code = he.HTTPStatus()
                                }
-                               uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
+                               uos.logger.WithError(uos.err).Errorf("update() returned %T error, changing response to HTTP %d", uos.err, code)
                                http.Error(uos.ResponseWriter, uos.err.Error(), code)
                                return
                        }
@@ -272,11 +269,6 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                }
        }
 
-       if collectionID == "" && !useSiteFS {
-               http.Error(w, notFoundMessage, http.StatusNotFound)
-               return
-       }
-
        forceReload := false
        if cc := r.Header.Get("Cache-Control"); strings.Contains(cc, "no-cache") || strings.Contains(cc, "must-revalidate") {
                forceReload = true
@@ -317,11 +309,6 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                return
        }
 
-       if useSiteFS {
-               h.serveSiteFS(w, r, reqTokens, credentialsOK, attachment)
-               return
-       }
-
        targetPath := pathParts[stripParts:]
        if tokens == nil && len(targetPath) > 0 && strings.HasPrefix(targetPath[0], "t=") {
                // http://ID.example/t=TOKEN/PATH...
@@ -336,6 +323,25 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                stripParts++
        }
 
+       fsprefix := ""
+       if useSiteFS {
+               if writeMethod[r.Method] {
+                       http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+                       return
+               }
+               if len(reqTokens) == 0 {
+                       w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
+                       http.Error(w, unauthorizedMessage, http.StatusUnauthorized)
+                       return
+               }
+               tokens = reqTokens
+       } else if collectionID == "" {
+               http.Error(w, notFoundMessage, http.StatusNotFound)
+               return
+       } else {
+               fsprefix = "by_id/" + collectionID + "/"
+       }
+
        if tokens == nil {
                tokens = reqTokens
                if h.Cluster.Users.AnonymousUserToken != "" {
@@ -363,37 +369,60 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                stripParts++
        }
 
-       arv := h.clientPool.Get()
-       if arv == nil {
-               http.Error(w, "client pool error: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
-               return
+       dirOpenMode := os.O_RDONLY
+       if writeMethod[r.Method] {
+               dirOpenMode = os.O_RDWR
        }
-       defer h.clientPool.Put(arv)
 
-       var collection *arvados.Collection
+       validToken := make(map[string]bool)
+       var token string
        var tokenUser *arvados.User
-       tokenResult := make(map[string]int)
-       for _, arv.ApiToken = range tokens {
-               var err error
-               collection, err = h.Cache.Get(arv, collectionID, forceReload)
-               if err == nil {
-                       // Success
-                       break
+       var sessionFS arvados.CustomFileSystem
+       var session *cachedSession
+       var collectionDir arvados.File
+       for _, token = range tokens {
+               var statusErr interface{ HTTPStatus() int }
+               fs, sess, user, err := h.Cache.GetSession(token)
+               if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusUnauthorized {
+                       // bad token
+                       continue
+               } else if err != nil {
+                       http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
+                       return
+               }
+               f, err := fs.OpenFile(fsprefix, dirOpenMode, 0)
+               if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusForbidden {
+                       // collection id is outside token scope
+                       validToken[token] = true
+                       continue
+               }
+               validToken[token] = true
+               if os.IsNotExist(err) {
+                       // collection does not exist or is not
+                       // readable using this token
+                       continue
+               } else if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return
                }
-               if srvErr, ok := err.(arvadosclient.APIServerError); ok {
-                       switch srvErr.HttpStatusCode {
-                       case 404, 401:
-                               // Token broken or insufficient to
-                               // retrieve collection
-                               tokenResult[arv.ApiToken] = srvErr.HttpStatusCode
-                               continue
+               defer f.Close()
+
+               collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
+               break
+       }
+       if forceReload {
+               err := collectionDir.Sync()
+               if err != nil {
+                       var statusErr interface{ HTTPStatus() int }
+                       if errors.As(err, &statusErr) {
+                               http.Error(w, err.Error(), statusErr.HTTPStatus())
+                       } else {
+                               http.Error(w, err.Error(), http.StatusInternalServerError)
                        }
+                       return
                }
-               // Something more serious is wrong
-               http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
-               return
        }
-       if collection == nil {
+       if session == nil {
                if pathToken || !credentialsOK {
                        // Either the URL is a "secret sharing link"
                        // that didn't work out (and asking the client
@@ -404,9 +433,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        return
                }
                for _, t := range reqTokens {
-                       if tokenResult[t] == 404 {
-                               // The client provided valid token(s), but the
-                               // collection was not found.
+                       if validToken[t] {
+                               // The client provided valid token(s),
+                               // but the collection was not found.
                                http.Error(w, notFoundMessage, http.StatusNotFound)
                                return
                        }
@@ -453,215 +482,108 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                return
        }
 
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
-               return
+       if r.Method == http.MethodGet || r.Method == http.MethodHead {
+               targetfnm := fsprefix + strings.Join(pathParts[stripParts:], "/")
+               if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() {
+                       if !strings.HasSuffix(r.URL.Path, "/") {
+                               h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
+                       } else {
+                               h.serveDirectory(w, r, fi.Name(), sessionFS, targetfnm, !useSiteFS)
+                       }
+                       return
+               }
        }
-       kc.RequestID = r.Header.Get("X-Request-Id")
 
        var basename string
        if len(targetPath) > 0 {
                basename = targetPath[len(targetPath)-1]
        }
-       applyContentDispositionHdr(w, r, basename, attachment)
-
-       client := (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(r.Header.Get("X-Request-Id"))
-
-       fs, err := collection.FileSystem(client, kc)
-       if err != nil {
-               http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-
-       writefs, writeOK := fs.(arvados.CollectionFileSystem)
-       targetIsPDH := arvadosclient.PDHMatch(collectionID)
-       if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
+       if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] {
                http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
-
-       // Check configured permission
-       _, sess, err := h.Cache.GetSession(arv.ApiToken)
-       if err != nil {
-               http.Error(w, "session cache: "+err.Error(), http.StatusInternalServerError)
-       }
-       tokenUser, err = h.Cache.GetTokenUser(arv.ApiToken)
-       if e := (interface{ HTTPStatus() int })(nil); errors.As(err, &e) && e.HTTPStatus() == http.StatusForbidden {
-               // Ignore expected error looking up user record when
-               // using a scoped token that allows getting
-               // collections/X but not users/current
-       } else if err != nil {
-               http.Error(w, "user lookup: "+err.Error(), http.StatusInternalServerError)
+       if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+               http.Error(w, "Not permitted", http.StatusForbidden)
+               return
        }
+       h.logUploadOrDownload(r, session.arvadosclient, sessionFS, fsprefix+strings.Join(targetPath, "/"), nil, tokenUser)
 
-       if webdavMethod[r.Method] {
-               if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-                       http.Error(w, "Not permitted", http.StatusForbidden)
+       if writeMethod[r.Method] {
+               // Save the collection only if/when all
+               // webdav->filesystem operations succeed --
+               // and send a 500 error if the modified
+               // collection can't be saved.
+               //
+               // Perform the write in a separate sitefs, so
+               // concurrent read operations on the same
+               // collection see the previous saved
+               // state. After the write succeeds and the
+               // collection record is updated, we reset the
+               // session so the updates are visible in
+               // subsequent read requests.
+               client := session.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               sessionFS = client.SiteFileSystem(session.keepclient)
+               writingDir, err := sessionFS.OpenFile(fsprefix, os.O_RDONLY, 0)
+               if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
                        return
                }
-               h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
-               if writeMethod[r.Method] {
-                       // Save the collection only if/when all
-                       // webdav->filesystem operations succeed --
-                       // and send a 500 error if the modified
-                       // collection can't be saved.
-                       w = &updateOnSuccess{
-                               ResponseWriter: w,
-                               logger:         ctxlog.FromContext(r.Context()),
-                               update: func() error {
-                                       return h.Cache.Update(client, *collection, writefs)
-                               }}
-               }
-               h := webdav.Handler{
-                       Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
-                       FileSystem: &webdavFS{
-                               collfs:        fs,
-                               writing:       writeMethod[r.Method],
-                               alwaysReadEOF: r.Method == "PROPFIND",
-                       },
-                       LockSystem: h.webdavLS,
-                       Logger: func(_ *http.Request, err error) {
+               defer writingDir.Close()
+               w = &updateOnSuccess{
+                       ResponseWriter: w,
+                       logger:         ctxlog.FromContext(r.Context()),
+                       update: func() error {
+                               err := writingDir.Sync()
+                               var te arvados.TransactionError
+                               if errors.As(err, &te) {
+                                       err = te
+                               }
                                if err != nil {
-                                       ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+                                       return err
                                }
-                       },
-               }
-               h.ServeHTTP(w, r)
-               return
-       }
-
-       openPath := "/" + strings.Join(targetPath, "/")
-       f, err := fs.Open(openPath)
-       if os.IsNotExist(err) {
-               // Requested non-existent path
-               http.Error(w, notFoundMessage, http.StatusNotFound)
-               return
-       } else if err != nil {
-               // Some other (unexpected) error
-               http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-       defer f.Close()
-       if stat, err := f.Stat(); err != nil {
-               // Can't get Size/IsDir (shouldn't happen with a collectionFS!)
-               http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
-       } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
-               // If client requests ".../dirname", redirect to
-               // ".../dirname/". This way, relative links in the
-               // listing for "dirname" can always be "fnm", never
-               // "dirname/fnm".
-               h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
-       } else if stat.IsDir() {
-               h.serveDirectory(w, r, collection.Name, fs, openPath, true)
-       } else {
-               if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-                       http.Error(w, "Not permitted", http.StatusForbidden)
-                       return
-               }
-               h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
-               http.ServeContent(w, r, basename, stat.ModTime(), f)
-               if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && w.WroteStatus() == http.StatusOK {
-                       // If we wrote fewer bytes than expected, it's
-                       // too late to change the real response code
-                       // or send an error message to the client, but
-                       // at least we can try to put some useful
-                       // debugging info in the logs.
-                       n, err := f.Read(make([]byte, 1024))
-                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", stat.Size(), wrote, n, err)
-               }
-       }
-}
-
-func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
-       arv = h.clientPool.Get()
-       if arv == nil {
-               err = h.clientPool.Err()
-               return
-       }
-       release = func() { h.clientPool.Put(arv) }
-       arv.ApiToken = token
-       kc, err = keepclient.MakeKeepClient(arv)
-       if err != nil {
-               release()
-               return
-       }
-       kc.RequestID = reqID
-       client = (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(reqID)
-       return
-}
-
-func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
-       if len(tokens) == 0 {
-               w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
-               http.Error(w, unauthorizedMessage, http.StatusUnauthorized)
-               return
-       }
-       if writeMethod[r.Method] {
-               http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
-               return
-       }
-
-       fs, sess, err := h.Cache.GetSession(tokens[0])
-       if err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
-               return
-       }
-       fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
-       f, err := fs.Open(r.URL.Path)
-       if os.IsNotExist(err) {
-               http.Error(w, err.Error(), http.StatusNotFound)
-               return
-       } else if err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
-               return
-       }
-       defer f.Close()
-       if fi, err := f.Stat(); err == nil && fi.IsDir() && r.Method == "GET" {
-               if !strings.HasSuffix(r.URL.Path, "/") {
-                       h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
-               } else {
-                       h.serveDirectory(w, r, fi.Name(), fs, r.URL.Path, false)
-               }
-               return
-       }
-
-       tokenUser, err := h.Cache.GetTokenUser(tokens[0])
-       if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-               http.Error(w, "Not permitted", http.StatusForbidden)
-               return
+                               // Sync the changes to the persistent
+                               // sessionfs for this token.
+                               snap, err := writingDir.Snapshot()
+                               if err != nil {
+                                       return err
+                               }
+                               collectionDir.Splice(snap)
+                               return nil
+                       }}
        }
-       h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, tokenUser)
-
-       if r.Method == "GET" {
-               _, basename := filepath.Split(r.URL.Path)
+       if r.Method == http.MethodGet {
                applyContentDispositionHdr(w, r, basename, attachment)
        }
        wh := webdav.Handler{
-               Prefix: "/",
+               Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
                FileSystem: &webdavFS{
-                       collfs:        fs,
+                       collfs:        sessionFS,
+                       prefix:        fsprefix,
                        writing:       writeMethod[r.Method],
                        alwaysReadEOF: r.Method == "PROPFIND",
                },
                LockSystem: h.webdavLS,
-               Logger: func(_ *http.Request, err error) {
+               Logger: func(r *http.Request, err error) {
                        if err != nil {
                                ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
                        }
                },
        }
        wh.ServeHTTP(w, r)
+       if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
+               wrote := int64(w.WroteBodyBytes())
+               fnm := strings.Join(pathParts[stripParts:], "/")
+               fi, err := wh.FileSystem.Stat(r.Context(), fnm)
+               if err == nil && fi.Size() != wrote {
+                       var n int
+                       f, err := wh.FileSystem.OpenFile(r.Context(), fnm, os.O_RDONLY, 0)
+                       if err == nil {
+                               n, err = f.Read(make([]byte, 1024))
+                               f.Close()
+                       }
+                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", fi.Size(), wrote, n, err)
+               }
+       }
 }
 
 var dirListingTemplate = `<!DOCTYPE HTML>
@@ -971,9 +893,14 @@ func (h *handler) logUploadOrDownload(
 
 func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
        target := strings.TrimSuffix(path, "/")
-       for {
+       for cut := len(target); cut >= 0; cut = strings.LastIndexByte(target, '/') {
+               target = target[:cut]
                fi, err := fs.Stat(target)
-               if err != nil {
+               if os.IsNotExist(err) {
+                       // creating a new file/dir, or download
+                       // destined to fail
+                       continue
+               } else if err != nil {
                        return nil, ""
                }
                switch src := fi.Sys().(type) {
@@ -986,11 +913,6 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string)
                                return nil, ""
                        }
                }
-               // Try parent
-               cut := strings.LastIndexByte(target, '/')
-               if cut < 0 {
-                       return nil, ""
-               }
-               target = target[:cut]
        }
+       return nil, ""
 }
index 768013185ae7d50dc450fecd342ebcdc343db059..0f7d5078790deab8eed80f9fadfd03b2d4d7ed2d 100644 (file)
@@ -366,6 +366,24 @@ func (s *IntegrationSuite) TestVhostPortMatch(c *check.C) {
        }
 }
 
+func (s *IntegrationSuite) do(method string, urlstring string, token string, hdr http.Header) (*http.Request, *httptest.ResponseRecorder) {
+       u := mustParseURL(urlstring)
+       if hdr == nil && token != "" {
+               hdr = http.Header{"Authorization": {"Bearer " + token}}
+       } else if hdr == nil {
+               hdr = http.Header{}
+       } else if token != "" {
+               panic("must not pass both token and hdr")
+       }
+       return s.doReq(&http.Request{
+               Method:     method,
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     hdr,
+       })
+}
+
 func (s *IntegrationSuite) doReq(req *http.Request) (*http.Request, *httptest.ResponseRecorder) {
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
@@ -409,6 +427,26 @@ func (s *IntegrationSuite) TestSingleOriginSecretLink(c *check.C) {
        )
 }
 
+func (s *IntegrationSuite) TestCollectionSharingToken(c *check.C) {
+       s.testVhostRedirectTokenToCookie(c, "GET",
+               "example.com/c="+arvadostest.FooFileCollectionUUID+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+               "",
+               nil,
+               "",
+               http.StatusOK,
+               "foo",
+       )
+       // Same valid sharing token, but requesting a different collection
+       s.testVhostRedirectTokenToCookie(c, "GET",
+               "example.com/c="+arvadostest.FooCollection+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+               "",
+               nil,
+               "",
+               http.StatusNotFound,
+               notFoundMessage+"\n",
+       )
+}
+
 // Bad token in URL is 404 Not Found because it doesn't make sense to
 // retry the same URL with different authorization.
 func (s *IntegrationSuite) TestSingleOriginSecretLinkBadToken(c *check.C) {
@@ -1245,7 +1283,7 @@ func copyHeader(h http.Header) http.Header {
 }
 
 func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Request,
-       successCode int, direction string, perm bool, userUuid string, collectionUuid string, filepath string) {
+       successCode int, direction string, perm bool, userUuid, collectionUuid, collectionPDH, filepath string) {
 
        client := arvados.NewClientFromEnv()
        client.AuthToken = arvadostest.AdminToken
@@ -1258,6 +1296,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
        c.Check(err, check.IsNil)
        c.Check(logentries.Items, check.HasLen, 1)
        lastLogId := logentries.Items[0].ID
+       c.Logf("lastLogId: %d", lastLogId)
 
        var logbuf bytes.Buffer
        logger := logrus.New()
@@ -1274,6 +1313,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
                deadline := time.Now().Add(time.Second)
                for {
                        c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
+                       logentries = arvados.LogList{}
                        err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
                                arvados.ResourceListParams{
                                        Filters: []arvados.Filter{
@@ -1288,6 +1328,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
                                logentries.Items[0].ID > lastLogId &&
                                logentries.Items[0].ObjectUUID == userUuid &&
                                logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+                               (collectionPDH == "" || logentries.Items[0].Properties["portable_data_hash"] == collectionPDH) &&
                                logentries.Items[0].Properties["collection_file_path"] == filepath {
                                break
                        }
@@ -1321,7 +1362,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                                },
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", adminperm,
-                               arvadostest.AdminUserUUID, arvadostest.FooCollection, "foo")
+                               arvadostest.AdminUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
 
                        // Test user permission
                        req = &http.Request{
@@ -1334,7 +1375,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                                },
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", userperm,
-                               arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+                               arvadostest.ActiveUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
                }
        }
 
@@ -1354,7 +1395,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                        },
                }
                s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
-                       arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, "dir1/subdir/file1")
+                       arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, arvadostest.MultilevelCollection1PDH, "dir1/subdir/file1")
        }
 
        u = mustParseURL("http://" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".keep-web.example/foo")
@@ -1368,7 +1409,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                },
        }
        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
-               arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+               arvadostest.ActiveUserUUID, "", arvadostest.FooCollectionPDH, "foo")
 }
 
 func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
@@ -1408,7 +1449,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
                                Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", adminperm,
-                               arvadostest.AdminUserUUID, coll.UUID, "bar")
+                               arvadostest.AdminUserUUID, coll.UUID, "", "bar")
 
                        // Test user permission
                        req = &http.Request{
@@ -1422,7 +1463,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
                                Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", userperm,
-                               arvadostest.ActiveUserUUID, coll.UUID, "bar")
+                               arvadostest.ActiveUserUUID, coll.UUID, "", "bar")
                }
        }
 }
index 7a23cd1fad06a551310fe9efaaa7030cf0d87d91..cd379dc6bd667df887b410edce26abd6eff209e7 100644 (file)
@@ -16,10 +16,6 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
-var (
-       version = "dev"
-)
-
 var Command = service.Command(arvados.ServiceNameKeepweb, newHandlerOrErrorHandler)
 
 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
index 1f458f8e59ad2e2fa9139d4e388fe8554f70a420..381a1110b131d877c8d6d69f34f63fe0b43b77e8 100644 (file)
@@ -27,9 +27,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/AdRoll/goamz/s3"
 )
 
@@ -312,33 +310,17 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                return false
        }
 
-       var err error
-       var fs arvados.CustomFileSystem
-       var arvclient *arvadosclient.ArvadosClient
+       fs, sess, tokenUser, err := h.Cache.GetSession(token)
+       if err != nil {
+               s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+               return true
+       }
        if r.Method == http.MethodGet || r.Method == http.MethodHead {
-               // Use a single session (cached FileSystem) across
-               // multiple read requests.
-               var sess *cachedSession
-               fs, sess, err = h.Cache.GetSession(token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               arvclient = sess.arvadosclient
-       } else {
                // Create a FileSystem for this request, to avoid
                // exposing incomplete write operations to concurrent
                // requests.
-               var kc *keepclient.KeepClient
-               var release func()
-               var client *arvados.Client
-               arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               defer release()
-               fs = client.SiteFileSystem(kc)
+               client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               fs = client.SiteFileSystem(sess.keepclient)
                fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
        }
 
@@ -418,12 +400,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        return true
                }
 
-               tokenUser, err := h.Cache.GetTokenUser(token)
                if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                        http.Error(w, "Not permitted", http.StatusForbidden)
                        return true
                }
-               h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+               h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                // shallow copy r, and change URL path
                r := *r
@@ -514,12 +495,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        }
                        defer f.Close()
 
-                       tokenUser, err := h.Cache.GetTokenUser(token)
                        if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                                http.Error(w, "Not permitted", http.StatusForbidden)
                                return true
                        }
-                       h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+                       h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                        _, err = io.Copy(f, r.Body)
                        if err != nil {
index 61c540808b640d6115a76e3efb70e10928b2dba3..30f755e1f56988f20bf0adc967032e5fcb89fd17 100644 (file)
@@ -29,6 +29,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -49,17 +50,17 @@ func (s *IntegrationSuite) TestNoToken(c *check.C) {
        } {
                hdr, body, _ := s.runCurl(c, token, "collections.example.com", "/collections/"+arvadostest.FooCollection+"/foo")
                c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-               c.Check(body, check.Equals, notFoundMessage+"\n")
+               c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
 
                if token != "" {
                        hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/collections/download/"+arvadostest.FooCollection+"/"+token+"/foo")
                        c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-                       c.Check(body, check.Equals, notFoundMessage+"\n")
+                       c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
                }
 
                hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/bad-route")
                c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-               c.Check(body, check.Equals, notFoundMessage+"\n")
+               c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
        }
 }
 
@@ -92,7 +93,7 @@ func (s *IntegrationSuite) Test404(c *check.C) {
                hdr, body, _ := s.runCurl(c, arvadostest.ActiveToken, "collections.example.com", uri)
                c.Check(hdr, check.Matches, "(?s)HTTP/1.1 404 Not Found\r\n.*")
                if len(body) > 0 {
-                       c.Check(body, check.Equals, notFoundMessage+"\n")
+                       c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
                }
        }
 }
@@ -475,15 +476,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
        c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
        c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
-       c.Check(counters["arvados_keepweb_collectioncache_requests//"].Value, check.Equals, int64(2))
-       c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2))
-       c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
-       c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
-       c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
-       // FooCollection's cached manifest size is 45 ("1f4b0....+45")
-       // plus one 51-byte blob signature; session fs counts 3 inodes
-       // * 64 bytes.
-       c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3))
+       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(384))
 
        // If the Host header indicates a collection, /metrics.json
        // refers to a file in the collection -- the metrics handler
@@ -529,7 +522,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
 
        ctx := ctxlog.Context(context.Background(), logger)
 
-       s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*handler)
+       s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, prometheus.NewRegistry()).(*handler)
        s.testServer = httptest.NewUnstartedServer(
                httpserver.AddRequestIDs(
                        httpserver.LogRequests(
index 501c355a7388a53fe3b40fc3f082c63768665620..0039f04eeff369a5fb5de3132389762d16535161 100644 (file)
@@ -36,7 +36,10 @@ var (
 // existence automatically so sequences like "mkcol foo; put foo/bar"
 // work as expected.
 type webdavFS struct {
-       collfs  arvados.FileSystem
+       collfs arvados.FileSystem
+       // prefix works like fs.Sub: Stat(name) calls
+       // Stat(prefix+name) in the wrapped filesystem.
+       prefix  string
        writing bool
        // webdav PROPFIND reads the first few bytes of each file
        // whose filename extension isn't recognized, which is
@@ -56,7 +59,7 @@ func (fs *webdavFS) makeparents(name string) {
        }
        dir = dir[:len(dir)-1]
        fs.makeparents(dir)
-       fs.collfs.Mkdir(dir, 0755)
+       fs.collfs.Mkdir(fs.prefix+dir, 0755)
 }
 
 func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
@@ -65,7 +68,7 @@ func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) er
        }
        name = strings.TrimRight(name, "/")
        fs.makeparents(name)
-       return fs.collfs.Mkdir(name, 0755)
+       return fs.collfs.Mkdir(fs.prefix+name, 0755)
 }
 
 func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
@@ -73,7 +76,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
        if writing {
                fs.makeparents(name)
        }
-       f, err = fs.collfs.OpenFile(name, flag, perm)
+       f, err = fs.collfs.OpenFile(fs.prefix+name, flag, perm)
        if !fs.writing {
                // webdav module returns 404 on all OpenFile errors,
                // but returns 405 Method Not Allowed if OpenFile()
@@ -93,7 +96,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
 }
 
 func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
-       return fs.collfs.RemoveAll(name)
+       return fs.collfs.RemoveAll(fs.prefix + name)
 }
 
 func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
@@ -106,14 +109,14 @@ func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
                newName = strings.TrimSuffix(newName, "/")
        }
        fs.makeparents(newName)
-       return fs.collfs.Rename(oldName, newName)
+       return fs.collfs.Rename(fs.prefix+oldName, fs.prefix+newName)
 }
 
 func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
        if fs.writing {
                fs.makeparents(name)
        }
-       return fs.collfs.Stat(name)
+       return fs.collfs.Stat(fs.prefix + name)
 }
 
 type writeFailer struct {
index 2eaea278162c9c4d2fb308b30d27b38e8ea5b639..8c4a649f69d64d4872fb11efb7083403d87094da 100644 (file)
@@ -695,7 +695,7 @@ func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
        defer srv.Close()
        hash, _, err := kc.PutB([]byte("shareddata"))
        c.Check(err, IsNil)
-       kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+       kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
        rdr, _, _, err := kc.Get(hash)
        c.Assert(err, IsNil)
        data, err := ioutil.ReadAll(rdr)