From 6b0d4ac8df4b5b4255eb56b1d76865f06089ca2a Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 12 Sep 2022 14:49:37 -0400 Subject: [PATCH] 19362: Move non-S3 collection-addressed reqs to sitefs code path. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/fs_base.go | 10 +- sdk/go/arvados/fs_collection.go | 93 ++++--- sdk/go/arvados/fs_collection_test.go | 16 +- sdk/go/arvados/fs_site.go | 43 +++- sdk/go/arvadostest/fixtures.go | 8 +- services/keep-web/cache.go | 352 +++------------------------ services/keep-web/cache_test.go | 242 +++++++++--------- services/keep-web/handler.go | 290 +++++++++++----------- services/keep-web/handler_test.go | 55 ++++- services/keep-web/s3.go | 38 +-- services/keep-web/server_test.go | 21 +- services/keep-web/webdav.go | 17 +- services/keepproxy/keepproxy_test.go | 2 +- 13 files changed, 510 insertions(+), 677 deletions(-) diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go index 2ad4d1f859..b0096140ce 100644 --- a/sdk/go/arvados/fs_base.go +++ b/sdk/go/arvados/fs_base.go @@ -631,7 +631,15 @@ func (fs *fileSystem) Rename(oldname, newname string) error { } locked := map[sync.Locker]bool{} for i := len(needLock) - 1; i >= 0; i-- { - if n := needLock[i]; !locked[n] { + n := needLock[i] + if fs, ok := n.(FileSystem); ok { + // Lock the fs's root dir directly, not + // through the fs. Otherwise our "locked" map + // would not reliably prevent double-locking + // the fs's root dir. + n = fs.rootnode() + } + if !locked[n] { n.Lock() defer n.Unlock() locked[n] = true diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 26012e2406..256f513cf9 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error { return ErrInvalidOperation } -// Check for and incorporate upstream changes -- unless that has -// already been done recently, in which case this func is a no-op. -func (fs *collectionFileSystem) checkChangesOnServer() error { +// Check for and incorporate upstream changes. If force==false, this +// is a no-op except once every ttl/100 or so. +// +// Return value is true if new content was loaded from upstream and +// any unsaved local changes have been discarded. +func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) { if fs.uuid == "" && fs.savedPDH.Load() == "" { - return nil + return false, nil } - // First try UUID if any, then last known PDH. Stop if all - // signatures are new enough. - checkingAll := false - for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} { - if id == "" { - continue - } - - fs.lockCheckChanges.Lock() - if !checkingAll && fs.holdCheckChanges.After(time.Now()) { - fs.lockCheckChanges.Unlock() - return nil - } - remain, ttl := fs.signatureTimeLeft() - if remain > 0.01 && !checkingAll { - fs.holdCheckChanges = time.Now().Add(ttl / 100) - } + fs.lockCheckChanges.Lock() + if !force && fs.holdCheckChanges.After(time.Now()) { fs.lockCheckChanges.Unlock() + return false, nil + } + remain, ttl := fs.signatureTimeLeft() + if remain > 0.01 { + fs.holdCheckChanges = time.Now().Add(ttl / 100) + } + fs.lockCheckChanges.Unlock() - if remain >= 0.5 { - break + if !force && remain >= 0.5 { + // plenty of time left on current signatures + return false, nil + } + + getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}} + if fs.uuid != "" { + var coll Collection + err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams) + if err != nil { + return false, err + } + if coll.PortableDataHash != fs.savedPDH.Load().(string) { + // collection has changed upstream since we + // last loaded or saved. Refresh local data, + // losing any unsaved local changes. + newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend) + if err != nil { + return false, err + } + snap, err := Snapshot(newfs, "/") + if err != nil { + return false, err + } + err = Splice(fs, "/", snap) + if err != nil { + return false, err + } + fs.savedPDH.Store(coll.PortableDataHash) + return true, nil } - checkingAll = true + fs.updateSignatures(coll.ManifestText) + return false, nil + } + if pdh := fs.savedPDH.Load().(string); pdh != "" { var coll Collection - err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}) + err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams) if err != nil { - continue + return false, err } fs.updateSignatures(coll.ManifestText) } - return nil + return false, nil } // Refresh signature on a single locator, if necessary. Assume caller @@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string { if err != nil || exp.Sub(time.Now()) > time.Minute { // Synchronous update is not needed. Start an // asynchronous update if needed. - go fs.checkChangesOnServer() + go fs.checkChangesOnServer(false) return locator } var manifests string @@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string { } func (fs *collectionFileSystem) Sync() error { - err := fs.checkChangesOnServer() + refreshed, err := fs.checkChangesOnServer(true) if err != nil { return err } - if fs.uuid == "" { + if refreshed || fs.uuid == "" { return nil } txt, err := fs.MarshalManifest(".") @@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error { "select": selectFields, }) if err != nil { - return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err) + return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err) } fs.updateSignatures(coll.ManifestText) fs.savedPDH.Store(coll.PortableDataHash) @@ -1163,10 +1189,11 @@ func (dn *dirnode) MemorySize() (size int64) { size += 64 for _, seg := range node.segments { switch seg := seg.(type) { + case storedSegment: + size += int64(len(seg.locator)) + 40 case *memSegment: - size += int64(seg.Len()) + size += int64(seg.Len()) + 8 } - size += 64 } } } diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go index c2cac3c6ce..da3166509c 100644 --- a/sdk/go/arvados/fs_collection_test.go +++ b/sdk/go/arvados/fs_collection_test.go @@ -1221,8 +1221,8 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) { c.Assert(err, check.IsNil) } } - inodebytes := int64((nDirs*(67*2+1) + 1) * 64) - c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes) + inodebytes := int64((nDirs*(67+1) + 1) * 64) + c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67*(1<<20+8))+inodebytes) c.Check(flushed, check.Equals, int64(0)) waitForFlush := func(expectUnflushed, expectFlushed int64) { @@ -1233,27 +1233,29 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) { } // Nothing flushed yet - waitForFlush((nDirs*67)<<20+inodebytes, 0) + waitForFlush(nDirs*67*(1<<20+8)+inodebytes, 0) // Flushing a non-empty dir "/" is non-recursive and there are // no top-level files, so this has no effect fs.Flush("/", false) - waitForFlush((nDirs*67)<<20+inodebytes, 0) + waitForFlush(nDirs*67*(1<<20+8)+inodebytes, 0) // Flush the full block in dir0 fs.Flush("dir0", false) - waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20) + bigloclen := int64(32 + 9 + 51 + 40) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 40 (see (*dirnode)MemorySize) + waitForFlush((nDirs*67-64)*(1<<20+8)+inodebytes+bigloclen*64, 64<<20) err = fs.Flush("dir-does-not-exist", false) c.Check(err, check.NotNil) // Flush full blocks in all dirs fs.Flush("", false) - waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20) + waitForFlush(nDirs*3*(1<<20+8)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20) // Flush non-full blocks, too fs.Flush("", true) - waitForFlush(inodebytes, nDirs*67<<20) + smallloclen := int64(32 + 8 + 51 + 40) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 40 (see (*dirnode)MemorySize) + waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20) } // Even when writing lots of files/dirs from different goroutines, as diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go index bb2eee7792..eed49296e7 100644 --- a/sdk/go/arvados/fs_site.go +++ b/sdk/go/arvados/fs_site.go @@ -5,6 +5,7 @@ package arvados import ( + "net/http" "os" "strings" "sync" @@ -136,29 +137,43 @@ func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time. return nil, ErrInvalidOperation } -func (fs *customFileSystem) mountByID(parent inode, id string) inode { +func (fs *customFileSystem) mountByID(parent inode, id string) (inode, error) { if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) { return fs.mountCollection(parent, id) } else if strings.Contains(id, "-j7d0g-") { - return fs.newProjectNode(fs.root, id, id, nil) + return fs.newProjectNode(fs.root, id, id, nil), nil } else { - return nil + return nil, nil } } -func (fs *customFileSystem) mountCollection(parent inode, id string) inode { +func (fs *customFileSystem) mountCollection(parent inode, id string) (inode, error) { var coll Collection err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil) - if err != nil { - return nil + if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound { + return nil, nil + } else if err != nil { + return nil, err + } + if len(id) != 27 { + // This means id is a PDH, and controller/railsapi + // returned one of (possibly) many collections with + // that PDH. We don't want to expose (e.g., through + // Sys()) the metadata from that collection -- only + // the fields that are common to all matching + // collections, i.e., PDH and manifest. + coll = Collection{ + PortableDataHash: coll.PortableDataHash, + ManifestText: coll.ManifestText, + } } newfs, err := coll.FileSystem(fs, fs) if err != nil { - return nil + return nil, err } cfs := newfs.(*collectionFileSystem) cfs.SetParent(parent, id) - return cfs + return cfs, nil } func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *Group) inode { @@ -202,15 +217,19 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj * // treenode, or nil for ENOENT. type vdirnode struct { treenode - create func(parent inode, name string) inode + create func(parent inode, name string) (inode, error) } func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) { return vn.treenode.Child(name, func(existing inode) (inode, error) { if existing == nil && vn.create != nil { - existing = vn.create(vn, name) - if existing != nil { - existing.SetParent(vn, name) + newnode, err := vn.create(vn, name) + if err != nil { + return nil, err + } + if newnode != nil { + newnode.SetParent(vn, name) + existing = newnode vn.treenode.fileinfo.modTime = time.Now() } } diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go index ec55725412..ac12f7ae13 100644 --- a/sdk/go/arvadostest/fixtures.go +++ b/sdk/go/arvadostest/fixtures.go @@ -32,6 +32,7 @@ const ( HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62" MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn" + MultilevelCollection1PDH = "f9ddda46bb293b6847da984e3aa735db+290" StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga" StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa" EmptyCollectionUUID = "zzzzz-4zz18-gs9ooj1h9sd5mde" @@ -83,8 +84,11 @@ const ( Repository2UUID = "zzzzz-s0uqq-382brsig8rp3667" Repository2Name = "active/foo2" - FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u" - FooCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss" + FooFileCollectionUUID = "zzzzz-4zz18-znfnqtbbv4spc3w" + FooFileCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u" + FooFileCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss" + BarFileCollectionUUID = "zzzzz-4zz18-ehbhgtheo8909or" + BarFileCollectionPDH = "fa7aeb5140e2848d39b416daeef4ffc5+45" WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml" diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index d5fdc4997e..3ba3e60abd 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -5,6 +5,7 @@ package keepweb import ( + "net/http" "sync" "sync/atomic" "time" @@ -20,74 +21,32 @@ import ( const metricsUpdateInterval = time.Second / 10 type cache struct { - cluster *arvados.Cluster - logger logrus.FieldLogger - registry *prometheus.Registry - metrics cacheMetrics - pdhs *lru.TwoQueueCache - collections *lru.TwoQueueCache - sessions *lru.TwoQueueCache - setupOnce sync.Once + cluster *arvados.Cluster + logger logrus.FieldLogger + registry *prometheus.Registry + metrics cacheMetrics + sessions *lru.TwoQueueCache + setupOnce sync.Once - chPruneSessions chan struct{} - chPruneCollections chan struct{} + chPruneSessions chan struct{} } type cacheMetrics struct { - requests prometheus.Counter - collectionBytes prometheus.Gauge - collectionEntries prometheus.Gauge - sessionEntries prometheus.Gauge - collectionHits prometheus.Counter - pdhHits prometheus.Counter - sessionHits prometheus.Counter - sessionMisses prometheus.Counter - apiCalls prometheus.Counter + requests prometheus.Counter + collectionBytes prometheus.Gauge + sessionEntries prometheus.Gauge + sessionHits prometheus.Counter + sessionMisses prometheus.Counter } func (m *cacheMetrics) setup(reg *prometheus.Registry) { - m.requests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "requests", - Help: "Number of targetID-to-manifest lookups handled.", - }) - reg.MustRegister(m.requests) - m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "hits", - Help: "Number of pdh-to-manifest cache hits.", - }) - reg.MustRegister(m.collectionHits) - m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "pdh_hits", - Help: "Number of uuid-to-pdh cache hits.", - }) - reg.MustRegister(m.pdhHits) - m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "api_calls", - Help: "Number of outgoing API calls made by cache.", - }) - reg.MustRegister(m.apiCalls) m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepweb_sessions", - Name: "cached_collection_bytes", - Help: "Total size of all cached manifests and sessions.", + Name: "cached_session_bytes", + Help: "Total size of all cached sessions.", }) reg.MustRegister(m.collectionBytes) - m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "keepweb_collectioncache", - Name: "cached_manifests", - Help: "Number of manifests in cache.", - }) - reg.MustRegister(m.collectionEntries) m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepweb_sessions", @@ -111,21 +70,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) { reg.MustRegister(m.sessionMisses) } -type cachedPDH struct { - expire time.Time - refresh time.Time - pdh string -} - -type cachedCollection struct { - expire time.Time - collection *arvados.Collection -} - -type cachedPermission struct { - expire time.Time -} - type cachedSession struct { expire time.Time fs atomic.Value @@ -137,14 +81,6 @@ type cachedSession struct { func (c *cache) setup() { var err error - c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries) - if err != nil { - panic(err) - } - c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) - if err != nil { - panic(err) - } c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions) if err != nil { panic(err) @@ -160,12 +96,6 @@ func (c *cache) setup() { c.updateGauges() } }() - c.chPruneCollections = make(chan struct{}, 1) - go func() { - for range c.chPruneCollections { - c.pruneCollections() - } - }() c.chPruneSessions = make(chan struct{}, 1) go func() { for range c.chPruneSessions { @@ -176,7 +106,6 @@ func (c *cache) setup() { func (c *cache) updateGauges() { c.metrics.collectionBytes.Set(float64(c.collectionBytes())) - c.metrics.collectionEntries.Set(float64(c.collections.Len())) c.metrics.sessionEntries.Set(float64(c.sessions.Len())) } @@ -184,39 +113,6 @@ var selectPDH = map[string]interface{}{ "select": []string{"portable_data_hash"}, } -// Update saves a modified version (fs) to an existing collection -// (coll) and, if successful, updates the relevant cache entries so -// subsequent calls to Get() reflect the modifications. -func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { - c.setupOnce.Do(c.setup) - - m, err := fs.MarshalManifest(".") - if err != nil || m == coll.ManifestText { - return err - } - coll.ManifestText = m - var updated arvados.Collection - err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ - "collection": map[string]string{ - "manifest_text": coll.ManifestText, - }, - }) - if err != nil { - c.pdhs.Remove(coll.UUID) - return err - } - c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{ - expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)), - collection: &updated, - }) - c.pdhs.Add(coll.UUID, &cachedPDH{ - expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)), - refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)), - pdh: updated.PortableDataHash, - }) - return nil -} - // ResetSession unloads any potentially stale state. Should be called // after write operations, so subsequent reads don't return stale // data. @@ -227,7 +123,7 @@ func (c *cache) ResetSession(token string) { // Get a long-lived CustomFileSystem suitable for doing a read operation // with the given token. -func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) { +func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) { c.setupOnce.Do(c.setup) now := time.Now() ent, _ := c.sessions.Get(token) @@ -241,12 +137,12 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi var err error sess.client, err = arvados.NewClientFromConfig(c.cluster) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sess.client.AuthToken = token sess.arvadosclient, err = arvadosclient.New(sess.client) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sess.keepclient = keepclient.New(sess.arvadosclient) c.sessions.Add(token, sess) @@ -260,14 +156,29 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi case c.chPruneSessions <- struct{}{}: default: } + fs, _ := sess.fs.Load().(arvados.CustomFileSystem) - if fs != nil && !expired { - return fs, sess, nil + if fs == nil || expired { + fs = sess.client.SiteFileSystem(sess.keepclient) + fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) + sess.fs.Store(fs) } - fs = sess.client.SiteFileSystem(sess.keepclient) - fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution) - sess.fs.Store(fs) - return fs, sess, nil + + user, _ := sess.user.Load().(*arvados.User) + if user == nil || expired { + user = new(arvados.User) + err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil) + if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden { + // token is OK, but "get user id" api is out + // of scope -- return nil, signifying unknown + // user + } else if err != nil { + return nil, nil, nil, err + } + sess.user.Store(user) + } + + return fs, sess, user, nil } // Remove all expired session cache entries, then remove more entries @@ -294,7 +205,7 @@ func (c *cache) pruneSessions() { // least frequently used entries (which Keys() returns last). for i := len(keys) - 1; i >= 0; i-- { token := keys[i] - if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { + if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes { break } ent, ok := c.sessions.Peek(token) @@ -311,147 +222,10 @@ func (c *cache) pruneSessions() { } } -func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { - c.setupOnce.Do(c.setup) - c.metrics.requests.Inc() - - var pdhRefresh bool - var pdh string - if arvadosclient.PDHMatch(targetID) { - pdh = targetID - } else if ent, cached := c.pdhs.Get(targetID); cached { - ent := ent.(*cachedPDH) - if ent.expire.Before(time.Now()) { - c.pdhs.Remove(targetID) - } else { - pdh = ent.pdh - pdhRefresh = forceReload || time.Now().After(ent.refresh) - c.metrics.pdhHits.Inc() - } - } - - if pdh == "" { - // UUID->PDH mapping is not cached, might as well get - // the whole collection record and be done (below). - c.logger.Debugf("cache(%s): have no pdh", targetID) - } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil { - // PDH->manifest is not cached, might as well get the - // whole collection record (below). - c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh) - } else if !pdhRefresh { - // We looked up UUID->PDH very recently, and we still - // have the manifest for that PDH. - c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh) - return cached, nil - } else { - // Get current PDH for this UUID (and confirm we still - // have read permission). Most likely, the cached PDH - // is still correct, in which case we can use our - // cached manifest. - c.metrics.apiCalls.Inc() - var current arvados.Collection - err := arv.Get("collections", targetID, selectPDH, ¤t) - if err != nil { - return nil, err - } - if current.PortableDataHash == pdh { - // PDH has not changed, cached manifest is - // correct. - c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh) - return cached, nil - } - if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil { - // PDH changed, and we already have the - // manifest for that new PDH. - c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash) - return cached, nil - } - } - - // Either UUID->PDH is not cached, or PDH->manifest is not - // cached. - var retrieved arvados.Collection - c.metrics.apiCalls.Inc() - err := arv.Get("collections", targetID, nil, &retrieved) - if err != nil { - return nil, err - } - c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash) - exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)) - if targetID != retrieved.PortableDataHash { - c.pdhs.Add(targetID, &cachedPDH{ - expire: exp, - refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)), - pdh: retrieved.PortableDataHash, - }) - } - c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{ - expire: exp, - collection: &retrieved, - }) - if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) { - select { - case c.chPruneCollections <- struct{}{}: - default: - } - } - return &retrieved, nil -} - -// pruneCollections checks the total bytes occupied by manifest_text -// in the collection cache and removes old entries as needed to bring -// the total size down to CollectionBytes. It also deletes all expired -// entries. -// -// pruneCollections does not aim to be perfectly correct when there is -// concurrent cache activity. -func (c *cache) pruneCollections() { - var size int64 - now := time.Now() - keys := c.collections.Keys() - entsize := make([]int, len(keys)) - expired := make([]bool, len(keys)) - for i, k := range keys { - v, ok := c.collections.Peek(k) - if !ok { - continue - } - ent := v.(*cachedCollection) - n := len(ent.collection.ManifestText) - size += int64(n) - entsize[i] = n - expired[i] = ent.expire.Before(now) - } - for i, k := range keys { - if expired[i] { - c.collections.Remove(k) - size -= int64(entsize[i]) - } - } - for i, k := range keys { - if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 { - break - } - if expired[i] { - // already removed this entry in the previous loop - continue - } - c.collections.Remove(k) - size -= int64(entsize[i]) - } -} - // collectionBytes returns the approximate combined memory size of the // collection cache and session filesystem cache. func (c *cache) collectionBytes() uint64 { var size uint64 - for _, k := range c.collections.Keys() { - v, ok := c.collections.Peek(k) - if !ok { - continue - } - size += uint64(len(v.(*cachedCollection).collection.ManifestText)) - } for _, token := range c.sessions.Keys() { ent, ok := c.sessions.Peek(token) if !ok { @@ -463,49 +237,3 @@ func (c *cache) collectionBytes() uint64 { } return size } - -func (c *cache) lookupCollection(key string) *arvados.Collection { - e, cached := c.collections.Get(key) - if !cached { - return nil - } - ent := e.(*cachedCollection) - if ent.expire.Before(time.Now()) { - c.collections.Remove(key) - return nil - } - c.metrics.collectionHits.Inc() - return ent.collection -} - -func (c *cache) GetTokenUser(token string) (*arvados.User, error) { - // Get and cache user record associated with this - // token. We need to know their UUID for logging, and - // whether they are an admin or not for certain - // permission checks. - - // Get/create session entry - _, sess, err := c.GetSession(token) - if err != nil { - return nil, err - } - - // See if the user is already set, and if so, return it - user, _ := sess.user.Load().(*arvados.User) - if user != nil { - return user, nil - } - - // Fetch the user record - c.metrics.apiCalls.Inc() - var current arvados.User - - err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil) - if err != nil { - return nil, err - } - - // Stash the user record for next time - sess.user.Store(¤t) - return ¤t, nil -} diff --git a/services/keep-web/cache_test.go b/services/keep-web/cache_test.go index 6b8f427171..010e29a0b8 100644 --- a/services/keep-web/cache_test.go +++ b/services/keep-web/cache_test.go @@ -6,17 +6,21 @@ package keepweb import ( "bytes" + "net/http" + "net/http/httptest" + "regexp" + "strings" + "time" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" - "git.arvados.org/arvados.git/sdk/go/ctxlog" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/expfmt" "gopkg.in/check.v1" ) -func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs ...string) { +func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) { + s.handler.Cache.updateGauges() + reg := s.handler.Cache.registry mfs, err := reg.Gather() c.Check(err, check.IsNil) buf := &bytes.Buffer{} @@ -25,131 +29,139 @@ func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs c.Check(enc.Encode(mf), check.IsNil) } mm := buf.String() + // Remove comments to make the "value vs. regexp" failure + // output easier to read. + mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "") for _, reg := range regs { - c.Check(mm, check.Matches, `(?ms).*collectioncache_`+reg+`\n.*`) + c.Check(mm, check.Matches, `(?ms).*keepweb_sessions_`+reg+`\n.*`) } } -func (s *UnitSuite) TestCache(c *check.C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, check.Equals, nil) - - cache := &cache{ - cluster: s.cluster, - logger: ctxlog.TestLogger(c), - registry: prometheus.NewRegistry(), - } - +func (s *IntegrationSuite) TestCache(c *check.C) { // Hit the same collection 5 times using the same token. Only // the first req should cause an API call; the next 4 should // hit all caches. - arv.ApiToken = arvadostest.AdminToken - var coll *arvados.Collection + u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo") + req := &http.Request{ + Method: "GET", + Host: u.Host, + URL: u, + RequestURI: u.RequestURI(), + Header: http.Header{ + "Authorization": {"Bearer " + arvadostest.ActiveToken}, + }, + } for i := 0; i < 5; i++ { - coll, err = cache.Get(arv, arvadostest.FooCollection, false) - c.Check(err, check.Equals, nil) - c.Assert(coll, check.NotNil) - c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH) - c.Check(coll.ManifestText[:2], check.Equals, ". ") + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) } - s.checkCacheMetrics(c, cache.registry, - "requests 5", + s.checkCacheMetrics(c, "hits 4", - "pdh_hits 4", - "api_calls 1") - - // Hit the same collection 2 more times, this time requesting - // it by PDH and using a different token. The first req should - // miss the permission cache and fetch the new manifest; the - // second should hit the Collection cache and skip the API - // lookup. - arv.ApiToken = arvadostest.ActiveToken - - coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false) - c.Check(err, check.Equals, nil) - c.Assert(coll2, check.NotNil) - c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH) - c.Check(coll2.ManifestText[:2], check.Equals, ". ") - c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText) - - s.checkCacheMetrics(c, cache.registry, - "requests 6", - "hits 4", - "pdh_hits 4", - "api_calls 2") - - coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false) - c.Check(err, check.Equals, nil) - c.Assert(coll2, check.NotNil) - c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH) - c.Check(coll2.ManifestText[:2], check.Equals, ". ") - - s.checkCacheMetrics(c, cache.registry, - "requests 7", - "hits 5", - "pdh_hits 4", - "api_calls 2") - - // Alternating between two collections N times should produce - // only 2 more API calls. - arv.ApiToken = arvadostest.AdminToken - for i := 0; i < 20; i++ { - var target string - if i%2 == 0 { - target = arvadostest.HelloWorldCollection - } else { - target = arvadostest.FooBarDirCollection - } - _, err := cache.Get(arv, target, false) - c.Check(err, check.Equals, nil) + "misses 1", + "active 1") + + // Hit a shared collection 3 times using PDH, using a + // different token. + u2 := mustParseURL("http://" + strings.Replace(arvadostest.BarFileCollectionPDH, "+", "-", 1) + ".keep-web.example/bar") + req2 := &http.Request{ + Method: "GET", + Host: u2.Host, + URL: u2, + RequestURI: u2.RequestURI(), + Header: http.Header{ + "Authorization": {"Bearer " + arvadostest.SpectatorToken}, + }, } - s.checkCacheMetrics(c, cache.registry, - "requests 27", - "hits 23", - "pdh_hits 22", - "api_calls 4") -} - -func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, check.Equals, nil) - - cache := &cache{ - cluster: s.cluster, - logger: ctxlog.TestLogger(c), - registry: prometheus.NewRegistry(), + for i := 0; i < 3; i++ { + resp2 := httptest.NewRecorder() + s.handler.ServeHTTP(resp2, req2) + c.Check(resp2.Code, check.Equals, http.StatusOK) } - - for _, forceReload := range []bool{false, true, false, true} { - _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload) - c.Check(err, check.Equals, nil) + s.checkCacheMetrics(c, + "hits 6", + "misses 2", + "active 2") + + // Alternating between two collections/tokens N times should + // use the existing sessions. + for i := 0; i < 7; i++ { + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) + + resp2 := httptest.NewRecorder() + s.handler.ServeHTTP(resp2, req2) + c.Check(resp2.Code, check.Equals, http.StatusOK) } - - s.checkCacheMetrics(c, cache.registry, - "requests 4", - "hits 3", - "pdh_hits 0", - "api_calls 1") + s.checkCacheMetrics(c, + "hits 20", + "misses 2", + "active 2") } -func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, check.Equals, nil) - - cache := &cache{ - cluster: s.cluster, - logger: ctxlog.TestLogger(c), - registry: prometheus.NewRegistry(), - } - - for _, forceReload := range []bool{false, true, false, true} { - _, err := cache.Get(arv, arvadostest.FooCollection, forceReload) - c.Check(err, check.Equals, nil) - } +func (s *IntegrationSuite) TestForceReloadPDH(c *check.C) { + filename := strings.Replace(time.Now().Format(time.RFC3339Nano), ":", ".", -1) + manifest := ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:" + filename + "\n" + pdh := arvados.PortableDataHash(manifest) + client := arvados.NewClientFromEnv() + client.AuthToken = arvadostest.ActiveToken + + _, resp := s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, arvadostest.ActiveToken, nil) + c.Check(resp.Code, check.Equals, http.StatusNotFound) + + var coll arvados.Collection + err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{ + "collection": map[string]string{ + "manifest_text": manifest, + }, + }) + c.Assert(err, check.IsNil) + defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil) + c.Assert(coll.PortableDataHash, check.Equals, pdh) + + _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, "", http.Header{ + "Authorization": {"Bearer " + arvadostest.ActiveToken}, + "Cache-Control": {"must-revalidate"}, + }) + c.Check(resp.Code, check.Equals, http.StatusOK) + + _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/missingfile", "", http.Header{ + "Authorization": {"Bearer " + arvadostest.ActiveToken}, + "Cache-Control": {"must-revalidate"}, + }) + c.Check(resp.Code, check.Equals, http.StatusNotFound) +} - s.checkCacheMetrics(c, cache.registry, - "requests 4", - "hits 3", - "pdh_hits 3", - "api_calls 3") +func (s *IntegrationSuite) TestForceReloadUUID(c *check.C) { + client := arvados.NewClientFromEnv() + client.AuthToken = arvadostest.ActiveToken + var coll arvados.Collection + err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{ + "collection": map[string]string{ + "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile\n", + }, + }) + c.Assert(err, check.IsNil) + defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil) + + _, resp := s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil) + c.Check(resp.Code, check.Equals, http.StatusNotFound) + _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/oldfile", arvadostest.ActiveToken, nil) + c.Check(resp.Code, check.Equals, http.StatusOK) + _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil) + c.Check(resp.Code, check.Equals, http.StatusNotFound) + err = client.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{ + "collection": map[string]string{ + "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile 0:0:newfile\n", + }, + }) + c.Assert(err, check.IsNil) + _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil) + c.Check(resp.Code, check.Equals, http.StatusNotFound) + _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", "", http.Header{ + "Authorization": {"Bearer " + arvadostest.ActiveToken}, + "Cache-Control": {"must-revalidate"}, + }) + c.Check(resp.Code, check.Equals, http.StatusOK) } diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go index 3a1d9acde7..7ac8bc02db 100644 --- a/services/keep-web/handler.go +++ b/services/keep-web/handler.go @@ -6,6 +6,7 @@ package keepweb import ( "encoding/json" + "errors" "fmt" "html" "html/template" @@ -39,8 +40,8 @@ type handler struct { var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+") -var notFoundMessage = "404 Not found\r\n\r\nThe requested path was not found, or you do not have permission to access it.\r" -var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r" +var notFoundMessage = "Not Found" +var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r\n" // parseCollectionIDFromURL returns a UUID or PDH if s is a UUID or a // PDH (even if it is a PDH with "+" replaced by " " or "-"); @@ -73,9 +74,10 @@ func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) { // updateOnSuccess wraps httpserver.ResponseWriter. If the handler // sends an HTTP header indicating success, updateOnSuccess first -// calls the provided update func. If the update func fails, a 500 -// response is sent, and the status code and body sent by the handler -// are ignored (all response writes return the update error). +// calls the provided update func. If the update func fails, an error +// response is sent (using the error's HTTP status or 500 if none), +// and the status code and body sent by the handler are ignored (all +// response writes return the update error). type updateOnSuccess struct { httpserver.ResponseWriter logger logrus.FieldLogger @@ -100,10 +102,11 @@ func (uos *updateOnSuccess) WriteHeader(code int) { if code >= 200 && code < 400 { if uos.err = uos.update(); uos.err != nil { code := http.StatusInternalServerError - if err, ok := uos.err.(*arvados.TransactionError); ok { - code = err.StatusCode + var he interface{ HTTPStatus() int } + if errors.As(uos.err, &he) { + code = he.HTTPStatus() } - uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code) + uos.logger.WithError(uos.err).Errorf("update() returned %T error, changing response to HTTP %d", uos.err, code) http.Error(uos.ResponseWriter, uos.err.Error(), code) return } @@ -369,30 +372,60 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { } defer h.clientPool.Put(arv) - var collection *arvados.Collection + dirOpenMode := os.O_RDONLY + if writeMethod[r.Method] { + dirOpenMode = os.O_RDWR + } + + validToken := make(map[string]bool) + var token string var tokenUser *arvados.User - tokenResult := make(map[string]int) - for _, arv.ApiToken = range tokens { - var err error - collection, err = h.Cache.Get(arv, collectionID, forceReload) - if err == nil { - // Success - break + var sessionFS arvados.CustomFileSystem + var session *cachedSession + var collectionDir arvados.File + for _, token = range tokens { + var statusErr interface{ HTTPStatus() int } + fs, sess, user, err := h.Cache.GetSession(token) + if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusUnauthorized { + // bad token + continue + } else if err != nil { + http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError) + return + } + f, err := fs.OpenFile("by_id/"+collectionID, dirOpenMode, 0) + if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusForbidden { + // collection id is outside token scope + validToken[token] = true + continue } - if srvErr, ok := err.(arvadosclient.APIServerError); ok { - switch srvErr.HttpStatusCode { - case 404, 401: - // Token broken or insufficient to - // retrieve collection - tokenResult[arv.ApiToken] = srvErr.HttpStatusCode - continue + validToken[token] = true + if os.IsNotExist(err) { + // collection does not exist or is not + // readable using this token + continue + } else if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer f.Close() + + collectionDir, sessionFS, session, tokenUser = f, fs, sess, user + break + } + if forceReload { + err := collectionDir.Sync() + if err != nil { + var statusErr interface{ HTTPStatus() int } + if errors.As(err, &statusErr) { + http.Error(w, err.Error(), statusErr.HTTPStatus()) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) } + return } - // Something more serious is wrong - http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError) - return } - if collection == nil { + if session == nil { if pathToken || !credentialsOK { // Either the URL is a "secret sharing link" // that didn't work out (and asking the client @@ -403,9 +436,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { return } for _, t := range reqTokens { - if tokenResult[t] == 404 { - // The client provided valid token(s), but the - // collection was not found. + if validToken[t] { + // The client provided valid token(s), + // but the collection was not found. http.Error(w, notFoundMessage, http.StatusNotFound) return } @@ -452,118 +485,104 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { return } - kc, err := keepclient.MakeKeepClient(arv) - if err != nil { - http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError) - return - } - kc.RequestID = r.Header.Get("X-Request-Id") - var basename string if len(targetPath) > 0 { basename = targetPath[len(targetPath)-1] } applyContentDispositionHdr(w, r, basename, attachment) - client := (&arvados.Client{ - APIHost: arv.ApiServer, - AuthToken: arv.ApiToken, - Insecure: arv.ApiInsecure, - }).WithRequestID(r.Header.Get("X-Request-Id")) - - fs, err := collection.FileSystem(client, kc) - if err != nil { - http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError) + if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] { + http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed) return } - - writefs, writeOK := fs.(arvados.CollectionFileSystem) - targetIsPDH := arvadosclient.PDHMatch(collectionID) - if (targetIsPDH || !writeOK) && writeMethod[r.Method] { - http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed) + if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { + http.Error(w, "Not permitted", http.StatusForbidden) return } + h.logUploadOrDownload(r, session.arvadosclient, sessionFS, "by_id/"+collectionID+"/"+strings.Join(targetPath, "/"), nil, tokenUser) - // Check configured permission - _, sess, err := h.Cache.GetSession(arv.ApiToken) - tokenUser, err = h.Cache.GetTokenUser(arv.ApiToken) - - if webdavMethod[r.Method] { - if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { - http.Error(w, "Not permitted", http.StatusForbidden) + if writeMethod[r.Method] { + // Save the collection only if/when all + // webdav->filesystem operations succeed -- + // and send a 500 error if the modified + // collection can't be saved. + // + // Perform the write in a separate sitefs, so + // concurrent read operations on the same + // collection see the previous saved + // state. After the write succeeds and the + // collection record is updated, we reset the + // session so the updates are visible in + // subsequent read requests. + client := session.client.WithRequestID(r.Header.Get("X-Request-Id")) + sessionFS = client.SiteFileSystem(session.keepclient) + writingDir, err := sessionFS.OpenFile("by_id/"+collectionID, os.O_RDONLY, 0) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser) - - if writeMethod[r.Method] { - // Save the collection only if/when all - // webdav->filesystem operations succeed -- - // and send a 500 error if the modified - // collection can't be saved. - w = &updateOnSuccess{ - ResponseWriter: w, - logger: ctxlog.FromContext(r.Context()), - update: func() error { - return h.Cache.Update(client, *collection, writefs) - }} - } - h := webdav.Handler{ - Prefix: "/" + strings.Join(pathParts[:stripParts], "/"), - FileSystem: &webdavFS{ - collfs: fs, - writing: writeMethod[r.Method], - alwaysReadEOF: r.Method == "PROPFIND", - }, - LockSystem: h.webdavLS, - Logger: func(_ *http.Request, err error) { + defer writingDir.Close() + w = &updateOnSuccess{ + ResponseWriter: w, + logger: ctxlog.FromContext(r.Context()), + update: func() error { + err := writingDir.Sync() + var te arvados.TransactionError + if errors.As(err, &te) { + err = te + } + if err != nil { + return err + } + // Sync the changes to the persistent + // sessionfs for this token. + snap, err := writingDir.Snapshot() if err != nil { - ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler") + return err } - }, - } - h.ServeHTTP(w, r) - return + collectionDir.Splice(snap) + return nil + }} } - - openPath := "/" + strings.Join(targetPath, "/") - f, err := fs.Open(openPath) - if os.IsNotExist(err) { - // Requested non-existent path - http.Error(w, notFoundMessage, http.StatusNotFound) - return - } else if err != nil { - // Some other (unexpected) error - http.Error(w, "open: "+err.Error(), http.StatusInternalServerError) - return + wh := webdav.Handler{ + Prefix: "/" + strings.Join(pathParts[:stripParts], "/"), + FileSystem: &webdavFS{ + collfs: sessionFS, + prefix: "by_id/" + collectionID + "/", + writing: writeMethod[r.Method], + alwaysReadEOF: r.Method == "PROPFIND", + }, + LockSystem: h.webdavLS, + Logger: func(r *http.Request, err error) { + if err != nil { + ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler") + } + }, } - defer f.Close() - if stat, err := f.Stat(); err != nil { - // Can't get Size/IsDir (shouldn't happen with a collectionFS!) - http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError) - } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") { - // If client requests ".../dirname", redirect to - // ".../dirname/". This way, relative links in the - // listing for "dirname" can always be "fnm", never - // "dirname/fnm". - h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK) - } else if stat.IsDir() { - h.serveDirectory(w, r, collection.Name, fs, openPath, true) - } else { - if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { - http.Error(w, "Not permitted", http.StatusForbidden) + if r.Method == http.MethodGet || r.Method == http.MethodHead { + targetfnm := "by_id/" + collectionID + "/" + strings.Join(pathParts[stripParts:], "/") + if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() { + if !strings.HasSuffix(r.URL.Path, "/") { + h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK) + } else { + h.serveDirectory(w, r, fi.Name(), sessionFS, targetfnm, true) + } return } - h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser) - - http.ServeContent(w, r, basename, stat.ModTime(), f) - if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && w.WroteStatus() == http.StatusOK { - // If we wrote fewer bytes than expected, it's - // too late to change the real response code - // or send an error message to the client, but - // at least we can try to put some useful - // debugging info in the logs. - n, err := f.Read(make([]byte, 1024)) - ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", stat.Size(), wrote, n, err) + } + wh.ServeHTTP(w, r) + if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK { + wrote := int64(w.WroteBodyBytes()) + fnm := strings.Join(pathParts[stripParts:], "/") + fi, err := wh.FileSystem.Stat(r.Context(), fnm) + if err == nil && fi.Size() != wrote { + var n int + f, err := wh.FileSystem.OpenFile(r.Context(), fnm, os.O_RDONLY, 0) + if err == nil { + n, err = f.Read(make([]byte, 1024)) + f.Close() + } + ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", fi.Size(), wrote, n, err) } } } @@ -601,12 +620,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s return } - fs, sess, err := h.Cache.GetSession(tokens[0]) + fs, sess, user, err := h.Cache.GetSession(tokens[0]) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution) f, err := fs.Open(r.URL.Path) if os.IsNotExist(err) { http.Error(w, err.Error(), http.StatusNotFound) @@ -625,19 +643,17 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s return } - tokenUser, err := h.Cache.GetTokenUser(tokens[0]) - if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { + if !h.userPermittedToUploadOrDownload(r.Method, user) { http.Error(w, "Not permitted", http.StatusForbidden) return } - h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, user) if r.Method == "GET" { _, basename := filepath.Split(r.URL.Path) applyContentDispositionHdr(w, r, basename, attachment) } wh := webdav.Handler{ - Prefix: "/", FileSystem: &webdavFS{ collfs: fs, writing: writeMethod[r.Method], @@ -960,9 +976,14 @@ func (h *handler) logUploadOrDownload( func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) { target := strings.TrimSuffix(path, "/") - for { + for cut := len(target); cut >= 0; cut = strings.LastIndexByte(target, '/') { + target = target[:cut] fi, err := fs.Stat(target) - if err != nil { + if os.IsNotExist(err) { + // creating a new file/dir, or download + // destined to fail + continue + } else if err != nil { return nil, "" } switch src := fi.Sys().(type) { @@ -975,11 +996,6 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) return nil, "" } } - // Try parent - cut := strings.LastIndexByte(target, '/') - if cut < 0 { - return nil, "" - } - target = target[:cut] } + return nil, "" } diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go index 768013185a..0f7d507879 100644 --- a/services/keep-web/handler_test.go +++ b/services/keep-web/handler_test.go @@ -366,6 +366,24 @@ func (s *IntegrationSuite) TestVhostPortMatch(c *check.C) { } } +func (s *IntegrationSuite) do(method string, urlstring string, token string, hdr http.Header) (*http.Request, *httptest.ResponseRecorder) { + u := mustParseURL(urlstring) + if hdr == nil && token != "" { + hdr = http.Header{"Authorization": {"Bearer " + token}} + } else if hdr == nil { + hdr = http.Header{} + } else if token != "" { + panic("must not pass both token and hdr") + } + return s.doReq(&http.Request{ + Method: method, + Host: u.Host, + URL: u, + RequestURI: u.RequestURI(), + Header: hdr, + }) +} + func (s *IntegrationSuite) doReq(req *http.Request) (*http.Request, *httptest.ResponseRecorder) { resp := httptest.NewRecorder() s.handler.ServeHTTP(resp, req) @@ -409,6 +427,26 @@ func (s *IntegrationSuite) TestSingleOriginSecretLink(c *check.C) { ) } +func (s *IntegrationSuite) TestCollectionSharingToken(c *check.C) { + s.testVhostRedirectTokenToCookie(c, "GET", + "example.com/c="+arvadostest.FooFileCollectionUUID+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo", + "", + nil, + "", + http.StatusOK, + "foo", + ) + // Same valid sharing token, but requesting a different collection + s.testVhostRedirectTokenToCookie(c, "GET", + "example.com/c="+arvadostest.FooCollection+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo", + "", + nil, + "", + http.StatusNotFound, + notFoundMessage+"\n", + ) +} + // Bad token in URL is 404 Not Found because it doesn't make sense to // retry the same URL with different authorization. func (s *IntegrationSuite) TestSingleOriginSecretLinkBadToken(c *check.C) { @@ -1245,7 +1283,7 @@ func copyHeader(h http.Header) http.Header { } func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Request, - successCode int, direction string, perm bool, userUuid string, collectionUuid string, filepath string) { + successCode int, direction string, perm bool, userUuid, collectionUuid, collectionPDH, filepath string) { client := arvados.NewClientFromEnv() client.AuthToken = arvadostest.AdminToken @@ -1258,6 +1296,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ c.Check(err, check.IsNil) c.Check(logentries.Items, check.HasLen, 1) lastLogId := logentries.Items[0].ID + c.Logf("lastLogId: %d", lastLogId) var logbuf bytes.Buffer logger := logrus.New() @@ -1274,6 +1313,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ deadline := time.Now().Add(time.Second) for { c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry")) + logentries = arvados.LogList{} err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil, arvados.ResourceListParams{ Filters: []arvados.Filter{ @@ -1288,6 +1328,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ logentries.Items[0].ID > lastLogId && logentries.Items[0].ObjectUUID == userUuid && logentries.Items[0].Properties["collection_uuid"] == collectionUuid && + (collectionPDH == "" || logentries.Items[0].Properties["portable_data_hash"] == collectionPDH) && logentries.Items[0].Properties["collection_file_path"] == filepath { break } @@ -1321,7 +1362,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) { }, } s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", adminperm, - arvadostest.AdminUserUUID, arvadostest.FooCollection, "foo") + arvadostest.AdminUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo") // Test user permission req = &http.Request{ @@ -1334,7 +1375,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) { }, } s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", userperm, - arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo") + arvadostest.ActiveUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo") } } @@ -1354,7 +1395,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) { }, } s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true, - arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, "dir1/subdir/file1") + arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, arvadostest.MultilevelCollection1PDH, "dir1/subdir/file1") } u = mustParseURL("http://" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".keep-web.example/foo") @@ -1368,7 +1409,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) { }, } s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true, - arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo") + arvadostest.ActiveUserUUID, "", arvadostest.FooCollectionPDH, "foo") } func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) { @@ -1408,7 +1449,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) { Body: io.NopCloser(bytes.NewReader([]byte("bar"))), } s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", adminperm, - arvadostest.AdminUserUUID, coll.UUID, "bar") + arvadostest.AdminUserUUID, coll.UUID, "", "bar") // Test user permission req = &http.Request{ @@ -1422,7 +1463,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) { Body: io.NopCloser(bytes.NewReader([]byte("bar"))), } s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", userperm, - arvadostest.ActiveUserUUID, coll.UUID, "bar") + arvadostest.ActiveUserUUID, coll.UUID, "", "bar") } } } diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go index 1f458f8e59..381a1110b1 100644 --- a/services/keep-web/s3.go +++ b/services/keep-web/s3.go @@ -27,9 +27,7 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/ctxlog" - "git.arvados.org/arvados.git/sdk/go/keepclient" "github.com/AdRoll/goamz/s3" ) @@ -312,33 +310,17 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return false } - var err error - var fs arvados.CustomFileSystem - var arvclient *arvadosclient.ArvadosClient + fs, sess, tokenUser, err := h.Cache.GetSession(token) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) + return true + } if r.Method == http.MethodGet || r.Method == http.MethodHead { - // Use a single session (cached FileSystem) across - // multiple read requests. - var sess *cachedSession - fs, sess, err = h.Cache.GetSession(token) - if err != nil { - s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) - return true - } - arvclient = sess.arvadosclient - } else { // Create a FileSystem for this request, to avoid // exposing incomplete write operations to concurrent // requests. - var kc *keepclient.KeepClient - var release func() - var client *arvados.Client - arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token) - if err != nil { - s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) - return true - } - defer release() - fs = client.SiteFileSystem(kc) + client := sess.client.WithRequestID(r.Header.Get("X-Request-Id")) + fs = client.SiteFileSystem(sess.keepclient) fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution) } @@ -418,12 +400,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } - tokenUser, err := h.Cache.GetTokenUser(token) if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { http.Error(w, "Not permitted", http.StatusForbidden) return true } - h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser) // shallow copy r, and change URL path r := *r @@ -514,12 +495,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } defer f.Close() - tokenUser, err := h.Cache.GetTokenUser(token) if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { http.Error(w, "Not permitted", http.StatusForbidden) return true } - h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser) _, err = io.Copy(f, r.Body) if err != nil { diff --git a/services/keep-web/server_test.go b/services/keep-web/server_test.go index 61c540808b..1305416c40 100644 --- a/services/keep-web/server_test.go +++ b/services/keep-web/server_test.go @@ -29,6 +29,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/httpserver" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/prometheus/client_golang/prometheus" check "gopkg.in/check.v1" ) @@ -49,17 +50,17 @@ func (s *IntegrationSuite) TestNoToken(c *check.C) { } { hdr, body, _ := s.runCurl(c, token, "collections.example.com", "/collections/"+arvadostest.FooCollection+"/foo") c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`) - c.Check(body, check.Equals, notFoundMessage+"\n") + c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage) if token != "" { hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/collections/download/"+arvadostest.FooCollection+"/"+token+"/foo") c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`) - c.Check(body, check.Equals, notFoundMessage+"\n") + c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage) } hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/bad-route") c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`) - c.Check(body, check.Equals, notFoundMessage+"\n") + c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage) } } @@ -92,7 +93,7 @@ func (s *IntegrationSuite) Test404(c *check.C) { hdr, body, _ := s.runCurl(c, arvadostest.ActiveToken, "collections.example.com", uri) c.Check(hdr, check.Matches, "(?s)HTTP/1.1 404 Not Found\r\n.*") if len(body) > 0 { - c.Check(body, check.Equals, notFoundMessage+"\n") + c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage) } } } @@ -475,15 +476,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) { c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3") c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1") c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1") - c.Check(counters["arvados_keepweb_collectioncache_requests//"].Value, check.Equals, int64(2)) - c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2)) - c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1)) - c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1)) - c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1)) - // FooCollection's cached manifest size is 45 ("1f4b0....+45") - // plus one 51-byte blob signature; session fs counts 3 inodes - // * 64 bytes. - c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3)) + c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(445)) // If the Host header indicates a collection, /metrics.json // refers to a file in the collection -- the metrics handler @@ -529,7 +522,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) { ctx := ctxlog.Context(context.Background(), logger) - s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*handler) + s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, prometheus.NewRegistry()).(*handler) s.testServer = httptest.NewUnstartedServer( httpserver.AddRequestIDs( httpserver.LogRequests( diff --git a/services/keep-web/webdav.go b/services/keep-web/webdav.go index 501c355a73..0039f04eef 100644 --- a/services/keep-web/webdav.go +++ b/services/keep-web/webdav.go @@ -36,7 +36,10 @@ var ( // existence automatically so sequences like "mkcol foo; put foo/bar" // work as expected. type webdavFS struct { - collfs arvados.FileSystem + collfs arvados.FileSystem + // prefix works like fs.Sub: Stat(name) calls + // Stat(prefix+name) in the wrapped filesystem. + prefix string writing bool // webdav PROPFIND reads the first few bytes of each file // whose filename extension isn't recognized, which is @@ -56,7 +59,7 @@ func (fs *webdavFS) makeparents(name string) { } dir = dir[:len(dir)-1] fs.makeparents(dir) - fs.collfs.Mkdir(dir, 0755) + fs.collfs.Mkdir(fs.prefix+dir, 0755) } func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error { @@ -65,7 +68,7 @@ func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) er } name = strings.TrimRight(name, "/") fs.makeparents(name) - return fs.collfs.Mkdir(name, 0755) + return fs.collfs.Mkdir(fs.prefix+name, 0755) } func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) { @@ -73,7 +76,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os if writing { fs.makeparents(name) } - f, err = fs.collfs.OpenFile(name, flag, perm) + f, err = fs.collfs.OpenFile(fs.prefix+name, flag, perm) if !fs.writing { // webdav module returns 404 on all OpenFile errors, // but returns 405 Method Not Allowed if OpenFile() @@ -93,7 +96,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os } func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error { - return fs.collfs.RemoveAll(name) + return fs.collfs.RemoveAll(fs.prefix + name) } func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error { @@ -106,14 +109,14 @@ func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error { newName = strings.TrimSuffix(newName, "/") } fs.makeparents(newName) - return fs.collfs.Rename(oldName, newName) + return fs.collfs.Rename(fs.prefix+oldName, fs.prefix+newName) } func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) { if fs.writing { fs.makeparents(name) } - return fs.collfs.Stat(name) + return fs.collfs.Stat(fs.prefix + name) } type writeFailer struct { diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go index 8242f5b2b5..38231307bf 100644 --- a/services/keepproxy/keepproxy_test.go +++ b/services/keepproxy/keepproxy_test.go @@ -696,7 +696,7 @@ func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) { defer srv.Close() hash, _, err := kc.PutB([]byte("shareddata")) c.Check(err, IsNil) - kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken + kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken rdr, _, _, err := kc.Get(hash) c.Assert(err, IsNil) data, err := ioutil.ReadAll(rdr) -- 2.30.2