19362: Move non-S3 collection-addressed reqs to sitefs code path.
authorTom Clegg <tom@curii.com>
Mon, 12 Sep 2022 18:49:37 +0000 (14:49 -0400)
committerTom Clegg <tom@curii.com>
Mon, 12 Sep 2022 18:49:37 +0000 (14:49 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

13 files changed:
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_site.go
sdk/go/arvadostest/fixtures.go
services/keep-web/cache.go
services/keep-web/cache_test.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/s3.go
services/keep-web/server_test.go
services/keep-web/webdav.go
services/keepproxy/keepproxy_test.go

index 2ad4d1f859f1141035c04cb4180c5ef623d1fa04..b0096140cecec46bc50831a3e78abfcf1394c61d 100644 (file)
@@ -631,7 +631,15 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
        }
        locked := map[sync.Locker]bool{}
        for i := len(needLock) - 1; i >= 0; i-- {
-               if n := needLock[i]; !locked[n] {
+               n := needLock[i]
+               if fs, ok := n.(FileSystem); ok {
+                       // Lock the fs's root dir directly, not
+                       // through the fs. Otherwise our "locked" map
+                       // would not reliably prevent double-locking
+                       // the fs's root dir.
+                       n = fs.rootnode()
+               }
+               if !locked[n] {
                        n.Lock()
                        defer n.Unlock()
                        locked[n] = true
index 26012e240603d0be43a1019346c4e946e2821790..256f513cf9f3e9a126bb918fe18a021ca7ca2e8b 100644 (file)
@@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error {
        return ErrInvalidOperation
 }
 
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
        if fs.uuid == "" && fs.savedPDH.Load() == "" {
-               return nil
+               return false, nil
        }
 
-       // First try UUID if any, then last known PDH. Stop if all
-       // signatures are new enough.
-       checkingAll := false
-       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
-               if id == "" {
-                       continue
-               }
-
-               fs.lockCheckChanges.Lock()
-               if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
-                       fs.lockCheckChanges.Unlock()
-                       return nil
-               }
-               remain, ttl := fs.signatureTimeLeft()
-               if remain > 0.01 && !checkingAll {
-                       fs.holdCheckChanges = time.Now().Add(ttl / 100)
-               }
+       fs.lockCheckChanges.Lock()
+       if !force && fs.holdCheckChanges.After(time.Now()) {
                fs.lockCheckChanges.Unlock()
+               return false, nil
+       }
+       remain, ttl := fs.signatureTimeLeft()
+       if remain > 0.01 {
+               fs.holdCheckChanges = time.Now().Add(ttl / 100)
+       }
+       fs.lockCheckChanges.Unlock()
 
-               if remain >= 0.5 {
-                       break
+       if !force && remain >= 0.5 {
+               // plenty of time left on current signatures
+               return false, nil
+       }
+
+       getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+       if fs.uuid != "" {
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+               if err != nil {
+                       return false, err
+               }
+               if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+                       // collection has changed upstream since we
+                       // last loaded or saved. Refresh local data,
+                       // losing any unsaved local changes.
+                       newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+                       if err != nil {
+                               return false, err
+                       }
+                       snap, err := Snapshot(newfs, "/")
+                       if err != nil {
+                               return false, err
+                       }
+                       err = Splice(fs, "/", snap)
+                       if err != nil {
+                               return false, err
+                       }
+                       fs.savedPDH.Store(coll.PortableDataHash)
+                       return true, nil
                }
-               checkingAll = true
+               fs.updateSignatures(coll.ManifestText)
+               return false, nil
+       }
+       if pdh := fs.savedPDH.Load().(string); pdh != "" {
                var coll Collection
-               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
                if err != nil {
-                       continue
+                       return false, err
                }
                fs.updateSignatures(coll.ManifestText)
        }
-       return nil
+       return false, nil
 }
 
 // Refresh signature on a single locator, if necessary. Assume caller
@@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
        if err != nil || exp.Sub(time.Now()) > time.Minute {
                // Synchronous update is not needed. Start an
                // asynchronous update if needed.
-               go fs.checkChangesOnServer()
+               go fs.checkChangesOnServer(false)
                return locator
        }
        var manifests string
@@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       err := fs.checkChangesOnServer()
+       refreshed, err := fs.checkChangesOnServer(true)
        if err != nil {
                return err
        }
-       if fs.uuid == "" {
+       if refreshed || fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
@@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error {
                "select": selectFields,
        })
        if err != nil {
-               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+               return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
        }
        fs.updateSignatures(coll.ManifestText)
        fs.savedPDH.Store(coll.PortableDataHash)
@@ -1163,10 +1189,11 @@ func (dn *dirnode) MemorySize() (size int64) {
                        size += 64
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
+                               case storedSegment:
+                                       size += int64(len(seg.locator)) + 40
                                case *memSegment:
-                                       size += int64(seg.Len())
+                                       size += int64(seg.Len()) + 8
                                }
-                               size += 64
                        }
                }
        }
index c2cac3c6ce2e963b36b7654729e56524ba9bc2db..da3166509c617283167a05de6448422ae49dd184 100644 (file)
@@ -1221,8 +1221,8 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
-       inodebytes := int64((nDirs*(67*2+1) + 1) * 64)
-       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes)
+       inodebytes := int64((nDirs*(67+1) + 1) * 64)
+       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67*(1<<20+8))+inodebytes)
        c.Check(flushed, check.Equals, int64(0))
 
        waitForFlush := func(expectUnflushed, expectFlushed int64) {
@@ -1233,27 +1233,29 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        }
 
        // Nothing flushed yet
-       waitForFlush((nDirs*67)<<20+inodebytes, 0)
+       waitForFlush(nDirs*67*(1<<20+8)+inodebytes, 0)
 
        // Flushing a non-empty dir "/" is non-recursive and there are
        // no top-level files, so this has no effect
        fs.Flush("/", false)
-       waitForFlush((nDirs*67)<<20+inodebytes, 0)
+       waitForFlush(nDirs*67*(1<<20+8)+inodebytes, 0)
 
        // Flush the full block in dir0
        fs.Flush("dir0", false)
-       waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20)
+       bigloclen := int64(32 + 9 + 51 + 40) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 40 (see (*dirnode)MemorySize)
+       waitForFlush((nDirs*67-64)*(1<<20+8)+inodebytes+bigloclen*64, 64<<20)
 
        err = fs.Flush("dir-does-not-exist", false)
        c.Check(err, check.NotNil)
 
        // Flush full blocks in all dirs
        fs.Flush("", false)
-       waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20)
+       waitForFlush(nDirs*3*(1<<20+8)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
 
        // Flush non-full blocks, too
        fs.Flush("", true)
-       waitForFlush(inodebytes, nDirs*67<<20)
+       smallloclen := int64(32 + 8 + 51 + 40) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 40 (see (*dirnode)MemorySize)
+       waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
 }
 
 // Even when writing lots of files/dirs from different goroutines, as
index bb2eee77925fd2c682c7d42e1e8e175c4f2f1489..eed49296e78c81a9e7d668d58cd51348f738b8d1 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "net/http"
        "os"
        "strings"
        "sync"
@@ -136,29 +137,43 @@ func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.
        return nil, ErrInvalidOperation
 }
 
-func (fs *customFileSystem) mountByID(parent inode, id string) inode {
+func (fs *customFileSystem) mountByID(parent inode, id string) (inode, error) {
        if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) {
                return fs.mountCollection(parent, id)
        } else if strings.Contains(id, "-j7d0g-") {
-               return fs.newProjectNode(fs.root, id, id, nil)
+               return fs.newProjectNode(fs.root, id, id, nil), nil
        } else {
-               return nil
+               return nil, nil
        }
 }
 
-func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
+func (fs *customFileSystem) mountCollection(parent inode, id string) (inode, error) {
        var coll Collection
        err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
-       if err != nil {
-               return nil
+       if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+               return nil, nil
+       } else if err != nil {
+               return nil, err
+       }
+       if len(id) != 27 {
+               // This means id is a PDH, and controller/railsapi
+               // returned one of (possibly) many collections with
+               // that PDH. We don't want to expose (e.g., through
+               // Sys()) the metadata from that collection -- only
+               // the fields that are common to all matching
+               // collections, i.e., PDH and manifest.
+               coll = Collection{
+                       PortableDataHash: coll.PortableDataHash,
+                       ManifestText:     coll.ManifestText,
+               }
        }
        newfs, err := coll.FileSystem(fs, fs)
        if err != nil {
-               return nil
+               return nil, err
        }
        cfs := newfs.(*collectionFileSystem)
        cfs.SetParent(parent, id)
-       return cfs
+       return cfs, nil
 }
 
 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *Group) inode {
@@ -202,15 +217,19 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *
 // treenode, or nil for ENOENT.
 type vdirnode struct {
        treenode
-       create func(parent inode, name string) inode
+       create func(parent inode, name string) (inode, error)
 }
 
 func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
        return vn.treenode.Child(name, func(existing inode) (inode, error) {
                if existing == nil && vn.create != nil {
-                       existing = vn.create(vn, name)
-                       if existing != nil {
-                               existing.SetParent(vn, name)
+                       newnode, err := vn.create(vn, name)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if newnode != nil {
+                               newnode.SetParent(vn, name)
+                               existing = newnode
                                vn.treenode.fileinfo.modTime = time.Now()
                        }
                }
index ec55725412c381714014a09931441855f9393466..ac12f7ae13e93405b37a6814ed4e16bbda2b911e 100644 (file)
@@ -32,6 +32,7 @@ const (
        HelloWorldPdh           = "55713e6a34081eb03609e7ad5fcad129+62"
 
        MultilevelCollection1                        = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       MultilevelCollection1PDH                     = "f9ddda46bb293b6847da984e3aa735db+290"
        StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
        StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
        EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
@@ -83,8 +84,11 @@ const (
        Repository2UUID = "zzzzz-s0uqq-382brsig8rp3667"
        Repository2Name = "active/foo2"
 
-       FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
-       FooCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+       FooFileCollectionUUID             = "zzzzz-4zz18-znfnqtbbv4spc3w"
+       FooFileCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
+       FooFileCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+       BarFileCollectionUUID             = "zzzzz-4zz18-ehbhgtheo8909or"
+       BarFileCollectionPDH              = "fa7aeb5140e2848d39b416daeef4ffc5+45"
 
        WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
 
index d5fdc4997ecee3c67d1cc8ea4003c2c1e3f6cac4..3ba3e60abdad599bc5622e9cf5583f2422e81786 100644 (file)
@@ -5,6 +5,7 @@
 package keepweb
 
 import (
+       "net/http"
        "sync"
        "sync/atomic"
        "time"
@@ -20,74 +21,32 @@ import (
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       cluster     *arvados.Cluster
-       logger      logrus.FieldLogger
-       registry    *prometheus.Registry
-       metrics     cacheMetrics
-       pdhs        *lru.TwoQueueCache
-       collections *lru.TwoQueueCache
-       sessions    *lru.TwoQueueCache
-       setupOnce   sync.Once
+       cluster   *arvados.Cluster
+       logger    logrus.FieldLogger
+       registry  *prometheus.Registry
+       metrics   cacheMetrics
+       sessions  *lru.TwoQueueCache
+       setupOnce sync.Once
 
-       chPruneSessions    chan struct{}
-       chPruneCollections chan struct{}
+       chPruneSessions chan struct{}
 }
 
 type cacheMetrics struct {
-       requests          prometheus.Counter
-       collectionBytes   prometheus.Gauge
-       collectionEntries prometheus.Gauge
-       sessionEntries    prometheus.Gauge
-       collectionHits    prometheus.Counter
-       pdhHits           prometheus.Counter
-       sessionHits       prometheus.Counter
-       sessionMisses     prometheus.Counter
-       apiCalls          prometheus.Counter
+       requests        prometheus.Counter
+       collectionBytes prometheus.Gauge
+       sessionEntries  prometheus.Gauge
+       sessionHits     prometheus.Counter
+       sessionMisses   prometheus.Counter
 }
 
 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
-       m.requests = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "requests",
-               Help:      "Number of targetID-to-manifest lookups handled.",
-       })
-       reg.MustRegister(m.requests)
-       m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "hits",
-               Help:      "Number of pdh-to-manifest cache hits.",
-       })
-       reg.MustRegister(m.collectionHits)
-       m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "pdh_hits",
-               Help:      "Number of uuid-to-pdh cache hits.",
-       })
-       reg.MustRegister(m.pdhHits)
-       m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "api_calls",
-               Help:      "Number of outgoing API calls made by cache.",
-       })
-       reg.MustRegister(m.apiCalls)
        m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
-               Name:      "cached_collection_bytes",
-               Help:      "Total size of all cached manifests and sessions.",
+               Name:      "cached_session_bytes",
+               Help:      "Total size of all cached sessions.",
        })
        reg.MustRegister(m.collectionBytes)
-       m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "cached_manifests",
-               Help:      "Number of manifests in cache.",
-       })
-       reg.MustRegister(m.collectionEntries)
        m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
@@ -111,21 +70,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
        reg.MustRegister(m.sessionMisses)
 }
 
-type cachedPDH struct {
-       expire  time.Time
-       refresh time.Time
-       pdh     string
-}
-
-type cachedCollection struct {
-       expire     time.Time
-       collection *arvados.Collection
-}
-
-type cachedPermission struct {
-       expire time.Time
-}
-
 type cachedSession struct {
        expire        time.Time
        fs            atomic.Value
@@ -137,14 +81,6 @@ type cachedSession struct {
 
 func (c *cache) setup() {
        var err error
-       c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
-       if err != nil {
-               panic(err)
-       }
-       c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
-       if err != nil {
-               panic(err)
-       }
        c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
        if err != nil {
                panic(err)
@@ -160,12 +96,6 @@ func (c *cache) setup() {
                        c.updateGauges()
                }
        }()
-       c.chPruneCollections = make(chan struct{}, 1)
-       go func() {
-               for range c.chPruneCollections {
-                       c.pruneCollections()
-               }
-       }()
        c.chPruneSessions = make(chan struct{}, 1)
        go func() {
                for range c.chPruneSessions {
@@ -176,7 +106,6 @@ func (c *cache) setup() {
 
 func (c *cache) updateGauges() {
        c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
-       c.metrics.collectionEntries.Set(float64(c.collections.Len()))
        c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
@@ -184,39 +113,6 @@ var selectPDH = map[string]interface{}{
        "select": []string{"portable_data_hash"},
 }
 
-// Update saves a modified version (fs) to an existing collection
-// (coll) and, if successful, updates the relevant cache entries so
-// subsequent calls to Get() reflect the modifications.
-func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
-       c.setupOnce.Do(c.setup)
-
-       m, err := fs.MarshalManifest(".")
-       if err != nil || m == coll.ManifestText {
-               return err
-       }
-       coll.ManifestText = m
-       var updated arvados.Collection
-       err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
-               "collection": map[string]string{
-                       "manifest_text": coll.ManifestText,
-               },
-       })
-       if err != nil {
-               c.pdhs.Remove(coll.UUID)
-               return err
-       }
-       c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
-               expire:     time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               collection: &updated,
-       })
-       c.pdhs.Add(coll.UUID, &cachedPDH{
-               expire:  time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-               pdh:     updated.PortableDataHash,
-       })
-       return nil
-}
-
 // ResetSession unloads any potentially stale state. Should be called
 // after write operations, so subsequent reads don't return stale
 // data.
@@ -227,7 +123,7 @@ func (c *cache) ResetSession(token string) {
 
 // Get a long-lived CustomFileSystem suitable for doing a read operation
 // with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
        c.setupOnce.Do(c.setup)
        now := time.Now()
        ent, _ := c.sessions.Get(token)
@@ -241,12 +137,12 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
                var err error
                sess.client, err = arvados.NewClientFromConfig(c.cluster)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                sess.client.AuthToken = token
                sess.arvadosclient, err = arvadosclient.New(sess.client)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                sess.keepclient = keepclient.New(sess.arvadosclient)
                c.sessions.Add(token, sess)
@@ -260,14 +156,29 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
        case c.chPruneSessions <- struct{}{}:
        default:
        }
+
        fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-       if fs != nil && !expired {
-               return fs, sess, nil
+       if fs == nil || expired {
+               fs = sess.client.SiteFileSystem(sess.keepclient)
+               fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+               sess.fs.Store(fs)
        }
-       fs = sess.client.SiteFileSystem(sess.keepclient)
-       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
-       sess.fs.Store(fs)
-       return fs, sess, nil
+
+       user, _ := sess.user.Load().(*arvados.User)
+       if user == nil || expired {
+               user = new(arvados.User)
+               err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
+               if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
+                       // token is OK, but "get user id" api is out
+                       // of scope -- return nil, signifying unknown
+                       // user
+               } else if err != nil {
+                       return nil, nil, nil, err
+               }
+               sess.user.Store(user)
+       }
+
+       return fs, sess, user, nil
 }
 
 // Remove all expired session cache entries, then remove more entries
@@ -294,7 +205,7 @@ func (c *cache) pruneSessions() {
        // least frequently used entries (which Keys() returns last).
        for i := len(keys) - 1; i >= 0; i-- {
                token := keys[i]
-               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes {
                        break
                }
                ent, ok := c.sessions.Peek(token)
@@ -311,147 +222,10 @@ func (c *cache) pruneSessions() {
        }
 }
 
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
-       c.setupOnce.Do(c.setup)
-       c.metrics.requests.Inc()
-
-       var pdhRefresh bool
-       var pdh string
-       if arvadosclient.PDHMatch(targetID) {
-               pdh = targetID
-       } else if ent, cached := c.pdhs.Get(targetID); cached {
-               ent := ent.(*cachedPDH)
-               if ent.expire.Before(time.Now()) {
-                       c.pdhs.Remove(targetID)
-               } else {
-                       pdh = ent.pdh
-                       pdhRefresh = forceReload || time.Now().After(ent.refresh)
-                       c.metrics.pdhHits.Inc()
-               }
-       }
-
-       if pdh == "" {
-               // UUID->PDH mapping is not cached, might as well get
-               // the whole collection record and be done (below).
-               c.logger.Debugf("cache(%s): have no pdh", targetID)
-       } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
-               // PDH->manifest is not cached, might as well get the
-               // whole collection record (below).
-               c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
-       } else if !pdhRefresh {
-               // We looked up UUID->PDH very recently, and we still
-               // have the manifest for that PDH.
-               c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
-               return cached, nil
-       } else {
-               // Get current PDH for this UUID (and confirm we still
-               // have read permission).  Most likely, the cached PDH
-               // is still correct, in which case we can use our
-               // cached manifest.
-               c.metrics.apiCalls.Inc()
-               var current arvados.Collection
-               err := arv.Get("collections", targetID, selectPDH, &current)
-               if err != nil {
-                       return nil, err
-               }
-               if current.PortableDataHash == pdh {
-                       // PDH has not changed, cached manifest is
-                       // correct.
-                       c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
-                       return cached, nil
-               }
-               if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
-                       // PDH changed, and we already have the
-                       // manifest for that new PDH.
-                       c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
-                       return cached, nil
-               }
-       }
-
-       // Either UUID->PDH is not cached, or PDH->manifest is not
-       // cached.
-       var retrieved arvados.Collection
-       c.metrics.apiCalls.Inc()
-       err := arv.Get("collections", targetID, nil, &retrieved)
-       if err != nil {
-               return nil, err
-       }
-       c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
-       exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
-       if targetID != retrieved.PortableDataHash {
-               c.pdhs.Add(targetID, &cachedPDH{
-                       expire:  exp,
-                       refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-                       pdh:     retrieved.PortableDataHash,
-               })
-       }
-       c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
-               expire:     exp,
-               collection: &retrieved,
-       })
-       if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
-               select {
-               case c.chPruneCollections <- struct{}{}:
-               default:
-               }
-       }
-       return &retrieved, nil
-}
-
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
-       var size int64
-       now := time.Now()
-       keys := c.collections.Keys()
-       entsize := make([]int, len(keys))
-       expired := make([]bool, len(keys))
-       for i, k := range keys {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
-               }
-               ent := v.(*cachedCollection)
-               n := len(ent.collection.ManifestText)
-               size += int64(n)
-               entsize[i] = n
-               expired[i] = ent.expire.Before(now)
-       }
-       for i, k := range keys {
-               if expired[i] {
-                       c.collections.Remove(k)
-                       size -= int64(entsize[i])
-               }
-       }
-       for i, k := range keys {
-               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
-                       break
-               }
-               if expired[i] {
-                       // already removed this entry in the previous loop
-                       continue
-               }
-               c.collections.Remove(k)
-               size -= int64(entsize[i])
-       }
-}
-
 // collectionBytes returns the approximate combined memory size of the
 // collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
-       for _, k := range c.collections.Keys() {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
-               }
-               size += uint64(len(v.(*cachedCollection).collection.ManifestText))
-       }
        for _, token := range c.sessions.Keys() {
                ent, ok := c.sessions.Peek(token)
                if !ok {
@@ -463,49 +237,3 @@ func (c *cache) collectionBytes() uint64 {
        }
        return size
 }
-
-func (c *cache) lookupCollection(key string) *arvados.Collection {
-       e, cached := c.collections.Get(key)
-       if !cached {
-               return nil
-       }
-       ent := e.(*cachedCollection)
-       if ent.expire.Before(time.Now()) {
-               c.collections.Remove(key)
-               return nil
-       }
-       c.metrics.collectionHits.Inc()
-       return ent.collection
-}
-
-func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
-       // Get and cache user record associated with this
-       // token.  We need to know their UUID for logging, and
-       // whether they are an admin or not for certain
-       // permission checks.
-
-       // Get/create session entry
-       _, sess, err := c.GetSession(token)
-       if err != nil {
-               return nil, err
-       }
-
-       // See if the user is already set, and if so, return it
-       user, _ := sess.user.Load().(*arvados.User)
-       if user != nil {
-               return user, nil
-       }
-
-       // Fetch the user record
-       c.metrics.apiCalls.Inc()
-       var current arvados.User
-
-       err = sess.client.RequestAndDecode(&current, "GET", "/arvados/v1/users/current", nil, nil)
-       if err != nil {
-               return nil, err
-       }
-
-       // Stash the user record for next time
-       sess.user.Store(&current)
-       return &current, nil
-}
index 6b8f427171ef7813f8453c0b23ed968229295fb4..010e29a0b876105d49756d736adefd316878a12e 100644 (file)
@@ -6,17 +6,21 @@ package keepweb
 
 import (
        "bytes"
+       "net/http"
+       "net/http/httptest"
+       "regexp"
+       "strings"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/common/expfmt"
        "gopkg.in/check.v1"
 )
 
-func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs ...string) {
+func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) {
+       s.handler.Cache.updateGauges()
+       reg := s.handler.Cache.registry
        mfs, err := reg.Gather()
        c.Check(err, check.IsNil)
        buf := &bytes.Buffer{}
@@ -25,131 +29,139 @@ func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs
                c.Check(enc.Encode(mf), check.IsNil)
        }
        mm := buf.String()
+       // Remove comments to make the "value vs. regexp" failure
+       // output easier to read.
+       mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "")
        for _, reg := range regs {
-               c.Check(mm, check.Matches, `(?ms).*collectioncache_`+reg+`\n.*`)
+               c.Check(mm, check.Matches, `(?ms).*keepweb_sessions_`+reg+`\n.*`)
        }
 }
 
-func (s *UnitSuite) TestCache(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
-       }
-
+func (s *IntegrationSuite) TestCache(c *check.C) {
        // Hit the same collection 5 times using the same token. Only
        // the first req should cause an API call; the next 4 should
        // hit all caches.
-       arv.ApiToken = arvadostest.AdminToken
-       var coll *arvados.Collection
+       u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
+       req := &http.Request{
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header: http.Header{
+                       "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               },
+       }
        for i := 0; i < 5; i++ {
-               coll, err = cache.Get(arv, arvadostest.FooCollection, false)
-               c.Check(err, check.Equals, nil)
-               c.Assert(coll, check.NotNil)
-               c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-               c.Check(coll.ManifestText[:2], check.Equals, ". ")
+               resp := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
        }
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 5",
+       s.checkCacheMetrics(c,
                "hits 4",
-               "pdh_hits 4",
-               "api_calls 1")
-
-       // Hit the same collection 2 more times, this time requesting
-       // it by PDH and using a different token. The first req should
-       // miss the permission cache and fetch the new manifest; the
-       // second should hit the Collection cache and skip the API
-       // lookup.
-       arv.ApiToken = arvadostest.ActiveToken
-
-       coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false)
-       c.Check(err, check.Equals, nil)
-       c.Assert(coll2, check.NotNil)
-       c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-       c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-       c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText)
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 6",
-               "hits 4",
-               "pdh_hits 4",
-               "api_calls 2")
-
-       coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false)
-       c.Check(err, check.Equals, nil)
-       c.Assert(coll2, check.NotNil)
-       c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-       c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 7",
-               "hits 5",
-               "pdh_hits 4",
-               "api_calls 2")
-
-       // Alternating between two collections N times should produce
-       // only 2 more API calls.
-       arv.ApiToken = arvadostest.AdminToken
-       for i := 0; i < 20; i++ {
-               var target string
-               if i%2 == 0 {
-                       target = arvadostest.HelloWorldCollection
-               } else {
-                       target = arvadostest.FooBarDirCollection
-               }
-               _, err := cache.Get(arv, target, false)
-               c.Check(err, check.Equals, nil)
+               "misses 1",
+               "active 1")
+
+       // Hit a shared collection 3 times using PDH, using a
+       // different token.
+       u2 := mustParseURL("http://" + strings.Replace(arvadostest.BarFileCollectionPDH, "+", "-", 1) + ".keep-web.example/bar")
+       req2 := &http.Request{
+               Method:     "GET",
+               Host:       u2.Host,
+               URL:        u2,
+               RequestURI: u2.RequestURI(),
+               Header: http.Header{
+                       "Authorization": {"Bearer " + arvadostest.SpectatorToken},
+               },
        }
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 27",
-               "hits 23",
-               "pdh_hits 22",
-               "api_calls 4")
-}
-
-func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
+       for i := 0; i < 3; i++ {
+               resp2 := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp2, req2)
+               c.Check(resp2.Code, check.Equals, http.StatusOK)
        }
-
-       for _, forceReload := range []bool{false, true, false, true} {
-               _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload)
-               c.Check(err, check.Equals, nil)
+       s.checkCacheMetrics(c,
+               "hits 6",
+               "misses 2",
+               "active 2")
+
+       // Alternating between two collections/tokens N times should
+       // use the existing sessions.
+       for i := 0; i < 7; i++ {
+               resp := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+
+               resp2 := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp2, req2)
+               c.Check(resp2.Code, check.Equals, http.StatusOK)
        }
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 4",
-               "hits 3",
-               "pdh_hits 0",
-               "api_calls 1")
+       s.checkCacheMetrics(c,
+               "hits 20",
+               "misses 2",
+               "active 2")
 }
 
-func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
-       }
-
-       for _, forceReload := range []bool{false, true, false, true} {
-               _, err := cache.Get(arv, arvadostest.FooCollection, forceReload)
-               c.Check(err, check.Equals, nil)
-       }
+func (s *IntegrationSuite) TestForceReloadPDH(c *check.C) {
+       filename := strings.Replace(time.Now().Format(time.RFC3339Nano), ":", ".", -1)
+       manifest := ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:" + filename + "\n"
+       pdh := arvados.PortableDataHash(manifest)
+       client := arvados.NewClientFromEnv()
+       client.AuthToken = arvadostest.ActiveToken
+
+       _, resp := s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+
+       var coll arvados.Collection
+       err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": manifest,
+               },
+       })
+       c.Assert(err, check.IsNil)
+       defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(coll.PortableDataHash, check.Equals, pdh)
+
+       _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+
+       _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/missingfile", "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+}
 
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 4",
-               "hits 3",
-               "pdh_hits 3",
-               "api_calls 3")
+func (s *IntegrationSuite) TestForceReloadUUID(c *check.C) {
+       client := arvados.NewClientFromEnv()
+       client.AuthToken = arvadostest.ActiveToken
+       var coll arvados.Collection
+       err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile\n",
+               },
+       })
+       c.Assert(err, check.IsNil)
+       defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+
+       _, resp := s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/oldfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       err = client.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile 0:0:newfile\n",
+               },
+       })
+       c.Assert(err, check.IsNil)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusOK)
 }
index 3a1d9acde7e7d33208958e467931aef2b1474853..7ac8bc02dbe12a64b1dab4134141c3474bfd5ccb 100644 (file)
@@ -6,6 +6,7 @@ package keepweb
 
 import (
        "encoding/json"
+       "errors"
        "fmt"
        "html"
        "html/template"
@@ -39,8 +40,8 @@ type handler struct {
 
 var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
 
-var notFoundMessage = "404 Not found\r\n\r\nThe requested path was not found, or you do not have permission to access it.\r"
-var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r"
+var notFoundMessage = "Not Found"
+var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r\n"
 
 // parseCollectionIDFromURL returns a UUID or PDH if s is a UUID or a
 // PDH (even if it is a PDH with "+" replaced by " " or "-");
@@ -73,9 +74,10 @@ func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
 
 // updateOnSuccess wraps httpserver.ResponseWriter. If the handler
 // sends an HTTP header indicating success, updateOnSuccess first
-// calls the provided update func. If the update func fails, a 500
-// response is sent, and the status code and body sent by the handler
-// are ignored (all response writes return the update error).
+// calls the provided update func. If the update func fails, an error
+// response is sent (using the error's HTTP status or 500 if none),
+// and the status code and body sent by the handler are ignored (all
+// response writes return the update error).
 type updateOnSuccess struct {
        httpserver.ResponseWriter
        logger     logrus.FieldLogger
@@ -100,10 +102,11 @@ func (uos *updateOnSuccess) WriteHeader(code int) {
                if code >= 200 && code < 400 {
                        if uos.err = uos.update(); uos.err != nil {
                                code := http.StatusInternalServerError
-                               if err, ok := uos.err.(*arvados.TransactionError); ok {
-                                       code = err.StatusCode
+                               var he interface{ HTTPStatus() int }
+                               if errors.As(uos.err, &he) {
+                                       code = he.HTTPStatus()
                                }
-                               uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
+                               uos.logger.WithError(uos.err).Errorf("update() returned %T error, changing response to HTTP %d", uos.err, code)
                                http.Error(uos.ResponseWriter, uos.err.Error(), code)
                                return
                        }
@@ -369,30 +372,60 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        defer h.clientPool.Put(arv)
 
-       var collection *arvados.Collection
+       dirOpenMode := os.O_RDONLY
+       if writeMethod[r.Method] {
+               dirOpenMode = os.O_RDWR
+       }
+
+       validToken := make(map[string]bool)
+       var token string
        var tokenUser *arvados.User
-       tokenResult := make(map[string]int)
-       for _, arv.ApiToken = range tokens {
-               var err error
-               collection, err = h.Cache.Get(arv, collectionID, forceReload)
-               if err == nil {
-                       // Success
-                       break
+       var sessionFS arvados.CustomFileSystem
+       var session *cachedSession
+       var collectionDir arvados.File
+       for _, token = range tokens {
+               var statusErr interface{ HTTPStatus() int }
+               fs, sess, user, err := h.Cache.GetSession(token)
+               if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusUnauthorized {
+                       // bad token
+                       continue
+               } else if err != nil {
+                       http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
+                       return
+               }
+               f, err := fs.OpenFile("by_id/"+collectionID, dirOpenMode, 0)
+               if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusForbidden {
+                       // collection id is outside token scope
+                       validToken[token] = true
+                       continue
                }
-               if srvErr, ok := err.(arvadosclient.APIServerError); ok {
-                       switch srvErr.HttpStatusCode {
-                       case 404, 401:
-                               // Token broken or insufficient to
-                               // retrieve collection
-                               tokenResult[arv.ApiToken] = srvErr.HttpStatusCode
-                               continue
+               validToken[token] = true
+               if os.IsNotExist(err) {
+                       // collection does not exist or is not
+                       // readable using this token
+                       continue
+               } else if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return
+               }
+               defer f.Close()
+
+               collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
+               break
+       }
+       if forceReload {
+               err := collectionDir.Sync()
+               if err != nil {
+                       var statusErr interface{ HTTPStatus() int }
+                       if errors.As(err, &statusErr) {
+                               http.Error(w, err.Error(), statusErr.HTTPStatus())
+                       } else {
+                               http.Error(w, err.Error(), http.StatusInternalServerError)
                        }
+                       return
                }
-               // Something more serious is wrong
-               http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
-               return
        }
-       if collection == nil {
+       if session == nil {
                if pathToken || !credentialsOK {
                        // Either the URL is a "secret sharing link"
                        // that didn't work out (and asking the client
@@ -403,9 +436,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        return
                }
                for _, t := range reqTokens {
-                       if tokenResult[t] == 404 {
-                               // The client provided valid token(s), but the
-                               // collection was not found.
+                       if validToken[t] {
+                               // The client provided valid token(s),
+                               // but the collection was not found.
                                http.Error(w, notFoundMessage, http.StatusNotFound)
                                return
                        }
@@ -452,118 +485,104 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                return
        }
 
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-       kc.RequestID = r.Header.Get("X-Request-Id")
-
        var basename string
        if len(targetPath) > 0 {
                basename = targetPath[len(targetPath)-1]
        }
        applyContentDispositionHdr(w, r, basename, attachment)
 
-       client := (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(r.Header.Get("X-Request-Id"))
-
-       fs, err := collection.FileSystem(client, kc)
-       if err != nil {
-               http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
+       if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] {
+               http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
-
-       writefs, writeOK := fs.(arvados.CollectionFileSystem)
-       targetIsPDH := arvadosclient.PDHMatch(collectionID)
-       if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
-               http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+       if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+               http.Error(w, "Not permitted", http.StatusForbidden)
                return
        }
+       h.logUploadOrDownload(r, session.arvadosclient, sessionFS, "by_id/"+collectionID+"/"+strings.Join(targetPath, "/"), nil, tokenUser)
 
-       // Check configured permission
-       _, sess, err := h.Cache.GetSession(arv.ApiToken)
-       tokenUser, err = h.Cache.GetTokenUser(arv.ApiToken)
-
-       if webdavMethod[r.Method] {
-               if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-                       http.Error(w, "Not permitted", http.StatusForbidden)
+       if writeMethod[r.Method] {
+               // Save the collection only if/when all
+               // webdav->filesystem operations succeed --
+               // and send a 500 error if the modified
+               // collection can't be saved.
+               //
+               // Perform the write in a separate sitefs, so
+               // concurrent read operations on the same
+               // collection see the previous saved
+               // state. After the write succeeds and the
+               // collection record is updated, we reset the
+               // session so the updates are visible in
+               // subsequent read requests.
+               client := session.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               sessionFS = client.SiteFileSystem(session.keepclient)
+               writingDir, err := sessionFS.OpenFile("by_id/"+collectionID, os.O_RDONLY, 0)
+               if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
                        return
                }
-               h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
-               if writeMethod[r.Method] {
-                       // Save the collection only if/when all
-                       // webdav->filesystem operations succeed --
-                       // and send a 500 error if the modified
-                       // collection can't be saved.
-                       w = &updateOnSuccess{
-                               ResponseWriter: w,
-                               logger:         ctxlog.FromContext(r.Context()),
-                               update: func() error {
-                                       return h.Cache.Update(client, *collection, writefs)
-                               }}
-               }
-               h := webdav.Handler{
-                       Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
-                       FileSystem: &webdavFS{
-                               collfs:        fs,
-                               writing:       writeMethod[r.Method],
-                               alwaysReadEOF: r.Method == "PROPFIND",
-                       },
-                       LockSystem: h.webdavLS,
-                       Logger: func(_ *http.Request, err error) {
+               defer writingDir.Close()
+               w = &updateOnSuccess{
+                       ResponseWriter: w,
+                       logger:         ctxlog.FromContext(r.Context()),
+                       update: func() error {
+                               err := writingDir.Sync()
+                               var te arvados.TransactionError
+                               if errors.As(err, &te) {
+                                       err = te
+                               }
+                               if err != nil {
+                                       return err
+                               }
+                               // Sync the changes to the persistent
+                               // sessionfs for this token.
+                               snap, err := writingDir.Snapshot()
                                if err != nil {
-                                       ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+                                       return err
                                }
-                       },
-               }
-               h.ServeHTTP(w, r)
-               return
+                               collectionDir.Splice(snap)
+                               return nil
+                       }}
        }
-
-       openPath := "/" + strings.Join(targetPath, "/")
-       f, err := fs.Open(openPath)
-       if os.IsNotExist(err) {
-               // Requested non-existent path
-               http.Error(w, notFoundMessage, http.StatusNotFound)
-               return
-       } else if err != nil {
-               // Some other (unexpected) error
-               http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
-               return
+       wh := webdav.Handler{
+               Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+               FileSystem: &webdavFS{
+                       collfs:        sessionFS,
+                       prefix:        "by_id/" + collectionID + "/",
+                       writing:       writeMethod[r.Method],
+                       alwaysReadEOF: r.Method == "PROPFIND",
+               },
+               LockSystem: h.webdavLS,
+               Logger: func(r *http.Request, err error) {
+                       if err != nil {
+                               ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+                       }
+               },
        }
-       defer f.Close()
-       if stat, err := f.Stat(); err != nil {
-               // Can't get Size/IsDir (shouldn't happen with a collectionFS!)
-               http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
-       } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
-               // If client requests ".../dirname", redirect to
-               // ".../dirname/". This way, relative links in the
-               // listing for "dirname" can always be "fnm", never
-               // "dirname/fnm".
-               h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
-       } else if stat.IsDir() {
-               h.serveDirectory(w, r, collection.Name, fs, openPath, true)
-       } else {
-               if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-                       http.Error(w, "Not permitted", http.StatusForbidden)
+       if r.Method == http.MethodGet || r.Method == http.MethodHead {
+               targetfnm := "by_id/" + collectionID + "/" + strings.Join(pathParts[stripParts:], "/")
+               if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() {
+                       if !strings.HasSuffix(r.URL.Path, "/") {
+                               h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
+                       } else {
+                               h.serveDirectory(w, r, fi.Name(), sessionFS, targetfnm, true)
+                       }
                        return
                }
-               h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
-               http.ServeContent(w, r, basename, stat.ModTime(), f)
-               if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && w.WroteStatus() == http.StatusOK {
-                       // If we wrote fewer bytes than expected, it's
-                       // too late to change the real response code
-                       // or send an error message to the client, but
-                       // at least we can try to put some useful
-                       // debugging info in the logs.
-                       n, err := f.Read(make([]byte, 1024))
-                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", stat.Size(), wrote, n, err)
+       }
+       wh.ServeHTTP(w, r)
+       if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
+               wrote := int64(w.WroteBodyBytes())
+               fnm := strings.Join(pathParts[stripParts:], "/")
+               fi, err := wh.FileSystem.Stat(r.Context(), fnm)
+               if err == nil && fi.Size() != wrote {
+                       var n int
+                       f, err := wh.FileSystem.OpenFile(r.Context(), fnm, os.O_RDONLY, 0)
+                       if err == nil {
+                               n, err = f.Read(make([]byte, 1024))
+                               f.Close()
+                       }
+                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", fi.Size(), wrote, n, err)
                }
        }
 }
@@ -601,12 +620,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
                return
        }
 
-       fs, sess, err := h.Cache.GetSession(tokens[0])
+       fs, sess, user, err := h.Cache.GetSession(tokens[0])
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
-       fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
        f, err := fs.Open(r.URL.Path)
        if os.IsNotExist(err) {
                http.Error(w, err.Error(), http.StatusNotFound)
@@ -625,19 +643,17 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
                return
        }
 
-       tokenUser, err := h.Cache.GetTokenUser(tokens[0])
-       if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+       if !h.userPermittedToUploadOrDownload(r.Method, user) {
                http.Error(w, "Not permitted", http.StatusForbidden)
                return
        }
-       h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, tokenUser)
+       h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, user)
 
        if r.Method == "GET" {
                _, basename := filepath.Split(r.URL.Path)
                applyContentDispositionHdr(w, r, basename, attachment)
        }
        wh := webdav.Handler{
-               Prefix: "/",
                FileSystem: &webdavFS{
                        collfs:        fs,
                        writing:       writeMethod[r.Method],
@@ -960,9 +976,14 @@ func (h *handler) logUploadOrDownload(
 
 func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
        target := strings.TrimSuffix(path, "/")
-       for {
+       for cut := len(target); cut >= 0; cut = strings.LastIndexByte(target, '/') {
+               target = target[:cut]
                fi, err := fs.Stat(target)
-               if err != nil {
+               if os.IsNotExist(err) {
+                       // creating a new file/dir, or download
+                       // destined to fail
+                       continue
+               } else if err != nil {
                        return nil, ""
                }
                switch src := fi.Sys().(type) {
@@ -975,11 +996,6 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string)
                                return nil, ""
                        }
                }
-               // Try parent
-               cut := strings.LastIndexByte(target, '/')
-               if cut < 0 {
-                       return nil, ""
-               }
-               target = target[:cut]
        }
+       return nil, ""
 }
index 768013185ae7d50dc450fecd342ebcdc343db059..0f7d5078790deab8eed80f9fadfd03b2d4d7ed2d 100644 (file)
@@ -366,6 +366,24 @@ func (s *IntegrationSuite) TestVhostPortMatch(c *check.C) {
        }
 }
 
+func (s *IntegrationSuite) do(method string, urlstring string, token string, hdr http.Header) (*http.Request, *httptest.ResponseRecorder) {
+       u := mustParseURL(urlstring)
+       if hdr == nil && token != "" {
+               hdr = http.Header{"Authorization": {"Bearer " + token}}
+       } else if hdr == nil {
+               hdr = http.Header{}
+       } else if token != "" {
+               panic("must not pass both token and hdr")
+       }
+       return s.doReq(&http.Request{
+               Method:     method,
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     hdr,
+       })
+}
+
 func (s *IntegrationSuite) doReq(req *http.Request) (*http.Request, *httptest.ResponseRecorder) {
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
@@ -409,6 +427,26 @@ func (s *IntegrationSuite) TestSingleOriginSecretLink(c *check.C) {
        )
 }
 
+func (s *IntegrationSuite) TestCollectionSharingToken(c *check.C) {
+       s.testVhostRedirectTokenToCookie(c, "GET",
+               "example.com/c="+arvadostest.FooFileCollectionUUID+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+               "",
+               nil,
+               "",
+               http.StatusOK,
+               "foo",
+       )
+       // Same valid sharing token, but requesting a different collection
+       s.testVhostRedirectTokenToCookie(c, "GET",
+               "example.com/c="+arvadostest.FooCollection+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+               "",
+               nil,
+               "",
+               http.StatusNotFound,
+               notFoundMessage+"\n",
+       )
+}
+
 // Bad token in URL is 404 Not Found because it doesn't make sense to
 // retry the same URL with different authorization.
 func (s *IntegrationSuite) TestSingleOriginSecretLinkBadToken(c *check.C) {
@@ -1245,7 +1283,7 @@ func copyHeader(h http.Header) http.Header {
 }
 
 func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Request,
-       successCode int, direction string, perm bool, userUuid string, collectionUuid string, filepath string) {
+       successCode int, direction string, perm bool, userUuid, collectionUuid, collectionPDH, filepath string) {
 
        client := arvados.NewClientFromEnv()
        client.AuthToken = arvadostest.AdminToken
@@ -1258,6 +1296,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
        c.Check(err, check.IsNil)
        c.Check(logentries.Items, check.HasLen, 1)
        lastLogId := logentries.Items[0].ID
+       c.Logf("lastLogId: %d", lastLogId)
 
        var logbuf bytes.Buffer
        logger := logrus.New()
@@ -1274,6 +1313,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
                deadline := time.Now().Add(time.Second)
                for {
                        c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
+                       logentries = arvados.LogList{}
                        err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
                                arvados.ResourceListParams{
                                        Filters: []arvados.Filter{
@@ -1288,6 +1328,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
                                logentries.Items[0].ID > lastLogId &&
                                logentries.Items[0].ObjectUUID == userUuid &&
                                logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+                               (collectionPDH == "" || logentries.Items[0].Properties["portable_data_hash"] == collectionPDH) &&
                                logentries.Items[0].Properties["collection_file_path"] == filepath {
                                break
                        }
@@ -1321,7 +1362,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                                },
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", adminperm,
-                               arvadostest.AdminUserUUID, arvadostest.FooCollection, "foo")
+                               arvadostest.AdminUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
 
                        // Test user permission
                        req = &http.Request{
@@ -1334,7 +1375,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                                },
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", userperm,
-                               arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+                               arvadostest.ActiveUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
                }
        }
 
@@ -1354,7 +1395,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                        },
                }
                s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
-                       arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, "dir1/subdir/file1")
+                       arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, arvadostest.MultilevelCollection1PDH, "dir1/subdir/file1")
        }
 
        u = mustParseURL("http://" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".keep-web.example/foo")
@@ -1368,7 +1409,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                },
        }
        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
-               arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+               arvadostest.ActiveUserUUID, "", arvadostest.FooCollectionPDH, "foo")
 }
 
 func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
@@ -1408,7 +1449,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
                                Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", adminperm,
-                               arvadostest.AdminUserUUID, coll.UUID, "bar")
+                               arvadostest.AdminUserUUID, coll.UUID, "", "bar")
 
                        // Test user permission
                        req = &http.Request{
@@ -1422,7 +1463,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
                                Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", userperm,
-                               arvadostest.ActiveUserUUID, coll.UUID, "bar")
+                               arvadostest.ActiveUserUUID, coll.UUID, "", "bar")
                }
        }
 }
index 1f458f8e59ad2e2fa9139d4e388fe8554f70a420..381a1110b131d877c8d6d69f34f63fe0b43b77e8 100644 (file)
@@ -27,9 +27,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/AdRoll/goamz/s3"
 )
 
@@ -312,33 +310,17 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                return false
        }
 
-       var err error
-       var fs arvados.CustomFileSystem
-       var arvclient *arvadosclient.ArvadosClient
+       fs, sess, tokenUser, err := h.Cache.GetSession(token)
+       if err != nil {
+               s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+               return true
+       }
        if r.Method == http.MethodGet || r.Method == http.MethodHead {
-               // Use a single session (cached FileSystem) across
-               // multiple read requests.
-               var sess *cachedSession
-               fs, sess, err = h.Cache.GetSession(token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               arvclient = sess.arvadosclient
-       } else {
                // Create a FileSystem for this request, to avoid
                // exposing incomplete write operations to concurrent
                // requests.
-               var kc *keepclient.KeepClient
-               var release func()
-               var client *arvados.Client
-               arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               defer release()
-               fs = client.SiteFileSystem(kc)
+               client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               fs = client.SiteFileSystem(sess.keepclient)
                fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
        }
 
@@ -418,12 +400,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        return true
                }
 
-               tokenUser, err := h.Cache.GetTokenUser(token)
                if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                        http.Error(w, "Not permitted", http.StatusForbidden)
                        return true
                }
-               h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+               h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                // shallow copy r, and change URL path
                r := *r
@@ -514,12 +495,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        }
                        defer f.Close()
 
-                       tokenUser, err := h.Cache.GetTokenUser(token)
                        if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                                http.Error(w, "Not permitted", http.StatusForbidden)
                                return true
                        }
-                       h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+                       h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                        _, err = io.Copy(f, r.Body)
                        if err != nil {
index 61c540808b640d6115a76e3efb70e10928b2dba3..1305416c40b28f287b79be1443b699328b7eb681 100644 (file)
@@ -29,6 +29,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -49,17 +50,17 @@ func (s *IntegrationSuite) TestNoToken(c *check.C) {
        } {
                hdr, body, _ := s.runCurl(c, token, "collections.example.com", "/collections/"+arvadostest.FooCollection+"/foo")
                c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-               c.Check(body, check.Equals, notFoundMessage+"\n")
+               c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
 
                if token != "" {
                        hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/collections/download/"+arvadostest.FooCollection+"/"+token+"/foo")
                        c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-                       c.Check(body, check.Equals, notFoundMessage+"\n")
+                       c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
                }
 
                hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/bad-route")
                c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-               c.Check(body, check.Equals, notFoundMessage+"\n")
+               c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
        }
 }
 
@@ -92,7 +93,7 @@ func (s *IntegrationSuite) Test404(c *check.C) {
                hdr, body, _ := s.runCurl(c, arvadostest.ActiveToken, "collections.example.com", uri)
                c.Check(hdr, check.Matches, "(?s)HTTP/1.1 404 Not Found\r\n.*")
                if len(body) > 0 {
-                       c.Check(body, check.Equals, notFoundMessage+"\n")
+                       c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
                }
        }
 }
@@ -475,15 +476,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
        c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
        c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
-       c.Check(counters["arvados_keepweb_collectioncache_requests//"].Value, check.Equals, int64(2))
-       c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2))
-       c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
-       c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
-       c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
-       // FooCollection's cached manifest size is 45 ("1f4b0....+45")
-       // plus one 51-byte blob signature; session fs counts 3 inodes
-       // * 64 bytes.
-       c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3))
+       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(445))
 
        // If the Host header indicates a collection, /metrics.json
        // refers to a file in the collection -- the metrics handler
@@ -529,7 +522,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
 
        ctx := ctxlog.Context(context.Background(), logger)
 
-       s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*handler)
+       s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, prometheus.NewRegistry()).(*handler)
        s.testServer = httptest.NewUnstartedServer(
                httpserver.AddRequestIDs(
                        httpserver.LogRequests(
index 501c355a7388a53fe3b40fc3f082c63768665620..0039f04eeff369a5fb5de3132389762d16535161 100644 (file)
@@ -36,7 +36,10 @@ var (
 // existence automatically so sequences like "mkcol foo; put foo/bar"
 // work as expected.
 type webdavFS struct {
-       collfs  arvados.FileSystem
+       collfs arvados.FileSystem
+       // prefix works like fs.Sub: Stat(name) calls
+       // Stat(prefix+name) in the wrapped filesystem.
+       prefix  string
        writing bool
        // webdav PROPFIND reads the first few bytes of each file
        // whose filename extension isn't recognized, which is
@@ -56,7 +59,7 @@ func (fs *webdavFS) makeparents(name string) {
        }
        dir = dir[:len(dir)-1]
        fs.makeparents(dir)
-       fs.collfs.Mkdir(dir, 0755)
+       fs.collfs.Mkdir(fs.prefix+dir, 0755)
 }
 
 func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
@@ -65,7 +68,7 @@ func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) er
        }
        name = strings.TrimRight(name, "/")
        fs.makeparents(name)
-       return fs.collfs.Mkdir(name, 0755)
+       return fs.collfs.Mkdir(fs.prefix+name, 0755)
 }
 
 func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
@@ -73,7 +76,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
        if writing {
                fs.makeparents(name)
        }
-       f, err = fs.collfs.OpenFile(name, flag, perm)
+       f, err = fs.collfs.OpenFile(fs.prefix+name, flag, perm)
        if !fs.writing {
                // webdav module returns 404 on all OpenFile errors,
                // but returns 405 Method Not Allowed if OpenFile()
@@ -93,7 +96,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
 }
 
 func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
-       return fs.collfs.RemoveAll(name)
+       return fs.collfs.RemoveAll(fs.prefix + name)
 }
 
 func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
@@ -106,14 +109,14 @@ func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
                newName = strings.TrimSuffix(newName, "/")
        }
        fs.makeparents(newName)
-       return fs.collfs.Rename(oldName, newName)
+       return fs.collfs.Rename(fs.prefix+oldName, fs.prefix+newName)
 }
 
 func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
        if fs.writing {
                fs.makeparents(name)
        }
-       return fs.collfs.Stat(name)
+       return fs.collfs.Stat(fs.prefix + name)
 }
 
 type writeFailer struct {
index 8242f5b2b56868b23dfaecf4aefe814100e682ee..38231307bfc14755ec4b57721a81f9bc92ceef61 100644 (file)
@@ -696,7 +696,7 @@ func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
        defer srv.Close()
        hash, _, err := kc.PutB([]byte("shareddata"))
        c.Check(err, IsNil)
-       kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+       kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
        rdr, _, _, err := kc.Get(hash)
        c.Assert(err, IsNil)
        data, err := ioutil.ReadAll(rdr)