Merge branch '16535-s3'
authorTom Clegg <tom@tomclegg.ca>
Tue, 18 Aug 2020 20:47:15 +0000 (16:47 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 18 Aug 2020 20:47:15 +0000 (16:47 -0400)
refs #16535

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

20 files changed:
go.sum
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
sdk/go/arvados/config.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_deferred.go
sdk/go/arvados/fs_lookup.go
sdk/go/arvados/fs_project.go
sdk/go/arvados/fs_project_test.go
sdk/go/arvados/fs_site.go
sdk/go/arvados/fs_site_test.go
services/keep-web/handler.go
services/keep-web/main.go
services/keep-web/s3.go [new file with mode: 0644]
services/keep-web/s3_test.go [new file with mode: 0644]
services/keep-web/server.go
services/keep-web/server_test.go

diff --git a/go.sum b/go.sum
index 2565964e7d45121d76e59e0c7b5e21743beaaa28..ac5c03fc83726ca016a177a31087829eae395749 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -78,6 +78,7 @@ github.com/go-ldap/ldap v3.0.3+incompatible h1:HTeSZO8hWMS1Rgb2Ziku6b8a7qRIZZMHj
 github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
@@ -226,6 +227,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowK
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
index 270d4045b5aaa395ee9b5c749763a601f35e1b6b..cc871087c31f0f13405430683553c7256e77e819 100644 (file)
@@ -483,6 +483,9 @@ Clusters:
       # Use of this feature is not recommended, if it can be avoided.
       ForwardSlashNameSubstitution: ""
 
+      # Include "folder objects" in S3 ListObjects responses.
+      S3FolderObjects: true
+
       # Managed collection properties. At creation time, if the client didn't
       # provide the listed keys, they will be automatically populated following
       # one of the following behaviors:
index d6b02b750de122582e35a5aa34b508861106ac40..f15a2996197804c24face6be71fc4f97afb558f4 100644 (file)
@@ -59,10 +59,10 @@ func ExportJSON(w io.Writer, cluster *arvados.Cluster) error {
 // exists.
 var whitelist = map[string]bool{
        // | sort -t'"' -k2,2
-       "ClusterID":                                    true,
        "API":                                          true,
        "API.AsyncPermissionsUpdateInterval":           false,
        "API.DisabledAPIs":                             false,
+       "API.KeepServiceRequestTimeout":                false,
        "API.MaxConcurrentRequests":                    false,
        "API.MaxIndexDatabaseRead":                     false,
        "API.MaxItemsPerResponse":                      true,
@@ -71,24 +71,29 @@ var whitelist = map[string]bool{
        "API.MaxRequestSize":                           true,
        "API.RailsSessionSecretToken":                  false,
        "API.RequestTimeout":                           true,
-       "API.WebsocketClientEventQueue":                false,
        "API.SendTimeout":                              true,
+       "API.WebsocketClientEventQueue":                false,
        "API.WebsocketServerEventQueue":                false,
-       "API.KeepServiceRequestTimeout":                false,
        "AuditLogs":                                    false,
        "AuditLogs.MaxAge":                             false,
        "AuditLogs.MaxDeleteBatch":                     false,
        "AuditLogs.UnloggedAttributes":                 false,
+       "ClusterID":                                    true,
        "Collections":                                  true,
+       "Collections.BalanceCollectionBatch":           false,
+       "Collections.BalanceCollectionBuffers":         false,
+       "Collections.BalancePeriod":                    false,
+       "Collections.BalanceTimeout":                   false,
+       "Collections.BlobDeleteConcurrency":            false,
+       "Collections.BlobMissingReport":                false,
+       "Collections.BlobReplicateConcurrency":         false,
        "Collections.BlobSigning":                      true,
        "Collections.BlobSigningKey":                   false,
        "Collections.BlobSigningTTL":                   true,
        "Collections.BlobTrash":                        false,
-       "Collections.BlobTrashLifetime":                false,
-       "Collections.BlobTrashConcurrency":             false,
        "Collections.BlobTrashCheckInterval":           false,
-       "Collections.BlobDeleteConcurrency":            false,
-       "Collections.BlobReplicateConcurrency":         false,
+       "Collections.BlobTrashConcurrency":             false,
+       "Collections.BlobTrashLifetime":                false,
        "Collections.CollectionVersioning":             false,
        "Collections.DefaultReplication":               true,
        "Collections.DefaultTrashLifetime":             true,
@@ -97,18 +102,14 @@ var whitelist = map[string]bool{
        "Collections.ManagedProperties.*":              true,
        "Collections.ManagedProperties.*.*":            true,
        "Collections.PreserveVersionIfIdle":            true,
+       "Collections.S3FolderObjects":                  true,
        "Collections.TrashSweepInterval":               false,
        "Collections.TrustAllContent":                  false,
        "Collections.WebDAVCache":                      false,
-       "Collections.BalanceCollectionBatch":           false,
-       "Collections.BalancePeriod":                    false,
-       "Collections.BalanceTimeout":                   false,
-       "Collections.BlobMissingReport":                false,
-       "Collections.BalanceCollectionBuffers":         false,
        "Containers":                                   true,
        "Containers.CloudVMs":                          false,
-       "Containers.CrunchRunCommand":                  false,
        "Containers.CrunchRunArgumentsList":            false,
+       "Containers.CrunchRunCommand":                  false,
        "Containers.DefaultKeepCacheRAM":               true,
        "Containers.DispatchPrivateKey":                false,
        "Containers.JobsAPI":                           true,
@@ -155,28 +156,28 @@ var whitelist = map[string]bool{
        "Login.OpenIDConnect":                          true,
        "Login.OpenIDConnect.ClientID":                 false,
        "Login.OpenIDConnect.ClientSecret":             false,
-       "Login.OpenIDConnect.Enable":                   true,
-       "Login.OpenIDConnect.Issuer":                   false,
        "Login.OpenIDConnect.EmailClaim":               false,
        "Login.OpenIDConnect.EmailVerifiedClaim":       false,
+       "Login.OpenIDConnect.Enable":                   true,
+       "Login.OpenIDConnect.Issuer":                   false,
        "Login.OpenIDConnect.UsernameClaim":            false,
        "Login.PAM":                                    true,
        "Login.PAM.DefaultEmailDomain":                 false,
        "Login.PAM.Enable":                             true,
        "Login.PAM.Service":                            false,
+       "Login.RemoteTokenRefresh":                     true,
        "Login.SSO":                                    true,
        "Login.SSO.Enable":                             true,
        "Login.SSO.ProviderAppID":                      false,
        "Login.SSO.ProviderAppSecret":                  false,
-       "Login.RemoteTokenRefresh":                     true,
        "Mail":                                         true,
+       "Mail.EmailFrom":                               false,
+       "Mail.IssueReporterEmailFrom":                  false,
+       "Mail.IssueReporterEmailTo":                    false,
        "Mail.MailchimpAPIKey":                         false,
        "Mail.MailchimpListID":                         false,
        "Mail.SendUserSetupNotificationEmail":          false,
-       "Mail.IssueReporterEmailFrom":                  false,
-       "Mail.IssueReporterEmailTo":                    false,
        "Mail.SupportEmailAddress":                     true,
-       "Mail.EmailFrom":                               false,
        "ManagementToken":                              false,
        "PostgreSQL":                                   false,
        "RemoteClusters":                               true,
@@ -194,8 +195,8 @@ var whitelist = map[string]bool{
        "SystemRootToken":                              false,
        "TLS":                                          false,
        "Users":                                        true,
-       "Users.AnonymousUserToken":                     true,
        "Users.AdminNotifierEmailFrom":                 false,
+       "Users.AnonymousUserToken":                     true,
        "Users.AutoAdminFirstUser":                     false,
        "Users.AutoAdminUserWithEmail":                 false,
        "Users.AutoSetupNewUsers":                      false,
@@ -232,6 +233,7 @@ var whitelist = map[string]bool{
        "Workbench.EnableGettingStartedPopup":          true,
        "Workbench.EnablePublicProjectsPage":           true,
        "Workbench.FileViewersConfigURL":               true,
+       "Workbench.InactivePageHTML":                   true,
        "Workbench.LogViewerMaxBytes":                  true,
        "Workbench.MultiSiteSearch":                    true,
        "Workbench.ProfilingEnabled":                   true,
@@ -243,6 +245,8 @@ var whitelist = map[string]bool{
        "Workbench.ShowUserAgreementInline":            true,
        "Workbench.ShowUserNotifications":              true,
        "Workbench.SiteName":                           true,
+       "Workbench.SSHHelpHostSuffix":                  true,
+       "Workbench.SSHHelpPageHTML":                    true,
        "Workbench.Theme":                              true,
        "Workbench.UserProfileFormFields":              true,
        "Workbench.UserProfileFormFields.*":            true,
@@ -251,9 +255,6 @@ var whitelist = map[string]bool{
        "Workbench.UserProfileFormMessage":             true,
        "Workbench.VocabularyURL":                      true,
        "Workbench.WelcomePageHTML":                    true,
-       "Workbench.InactivePageHTML":                   true,
-       "Workbench.SSHHelpPageHTML":                    true,
-       "Workbench.SSHHelpHostSuffix":                  true,
 }
 
 func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
index 0241673aa550e5c783a04d875e92bd588eab091f..0374ff7c7b14dd9188b58ae149e98f8fdafb4d94 100644 (file)
@@ -489,6 +489,9 @@ Clusters:
       # Use of this feature is not recommended, if it can be avoided.
       ForwardSlashNameSubstitution: ""
 
+      # Include "folder objects" in S3 ListObjects responses.
+      S3FolderObjects: true
+
       # Managed collection properties. At creation time, if the client didn't
       # provide the listed keys, they will be automatically populated following
       # one of the following behaviors:
index c21addbba99284e5ad3e634e24e75e5deac9558e..41c20c8db2ee71cf4c4a024e7d1d73b72878a098 100644 (file)
@@ -121,6 +121,7 @@ type Cluster struct {
                TrashSweepInterval           Duration
                TrustAllContent              bool
                ForwardSlashNameSubstitution string
+               S3FolderObjects              bool
 
                BlobMissingReport        string
                BalancePeriod            Duration
index d06aba3695adc37f3d74057de2568778bcd9f9c9..5e57fed3beab3281b1d498936edb4eed813398ec 100644 (file)
@@ -31,6 +31,10 @@ var (
        ErrPermission        = os.ErrPermission
 )
 
+type syncer interface {
+       Sync() error
+}
+
 // A File is an *os.File-like interface for reading and writing files
 // in a FileSystem.
 type File interface {
@@ -299,6 +303,22 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
        return
 }
 
+func (n *treenode) Sync() error {
+       n.RLock()
+       defer n.RUnlock()
+       for _, inode := range n.inodes {
+               syncer, ok := inode.(syncer)
+               if !ok {
+                       return ErrInvalidOperation
+               }
+               err := syncer.Sync()
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 type fileSystem struct {
        root inode
        fsBackend
@@ -576,8 +596,11 @@ func (fs *fileSystem) remove(name string, recursive bool) error {
 }
 
 func (fs *fileSystem) Sync() error {
-       log.Printf("TODO: sync fileSystem")
-       return ErrInvalidOperation
+       if syncer, ok := fs.root.(syncer); ok {
+               return syncer.Sync()
+       } else {
+               return ErrInvalidOperation
+       }
 }
 
 func (fs *fileSystem) Flush(string, bool) error {
index 37bd494914df507dc9fc193576dc9e0afcc98ea9..0edc48162b1a031b8487ac9b38997c0a4b6f3f4d 100644 (file)
@@ -121,6 +121,62 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t
        }
 }
 
+func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+       return fs.rootnode().Child(name, replace)
+}
+
+func (fs *collectionFileSystem) FS() FileSystem {
+       return fs
+}
+
+func (fs *collectionFileSystem) FileInfo() os.FileInfo {
+       return fs.rootnode().FileInfo()
+}
+
+func (fs *collectionFileSystem) IsDir() bool {
+       return true
+}
+
+func (fs *collectionFileSystem) Lock() {
+       fs.rootnode().Lock()
+}
+
+func (fs *collectionFileSystem) Unlock() {
+       fs.rootnode().Unlock()
+}
+
+func (fs *collectionFileSystem) RLock() {
+       fs.rootnode().RLock()
+}
+
+func (fs *collectionFileSystem) RUnlock() {
+       fs.rootnode().RUnlock()
+}
+
+func (fs *collectionFileSystem) Parent() inode {
+       return fs.rootnode().Parent()
+}
+
+func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+       return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+       return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
+       return fs.rootnode().Readdir()
+}
+
+func (fs *collectionFileSystem) SetParent(parent inode, name string) {
+       fs.rootnode().SetParent(parent, name)
+}
+
+func (fs *collectionFileSystem) Truncate(int64) error {
+       return ErrInvalidOperation
+}
+
 func (fs *collectionFileSystem) Sync() error {
        if fs.uuid == "" {
                return nil
index f01369a885ece3b7c315c832b114caaf77715862..59a6a6ba825e57928e9348c17d971988fa24fc94 100644 (file)
@@ -7,7 +7,6 @@ package arvados
 import (
        "bytes"
        "crypto/md5"
-       "crypto/sha1"
        "errors"
        "fmt"
        "io"
@@ -33,6 +32,9 @@ type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
        onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
+       authToken   string               // client's auth token (used for signing locators)
+       sigkey      string               // blob signing key
+       sigttl      time.Duration        // blob signing ttl
        sync.RWMutex
 }
 
@@ -49,7 +51,7 @@ func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error
 }
 
 func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
-       locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+       locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
        buf := make([]byte, len(p))
        copy(buf, p)
        if kcs.onPut != nil {
@@ -61,9 +63,12 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        return locator, 1, nil
 }
 
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
 
 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+       if strings.Contains(locator, "+A") {
+               return locator, nil
+       }
        kcs.Lock()
        defer kcs.Unlock()
        if strings.Contains(locator, "+R") {
@@ -74,8 +79,9 @@ func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
                        return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
                }
        }
-       fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
-       return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+       locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
+       locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+       return locator, nil
 }
 
 type CollectionFSSuite struct {
@@ -92,7 +98,11 @@ func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.kc = &keepClientStub{
                blocks: map[string][]byte{
                        "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
-               }}
+               },
+               sigkey:    fixtureBlobSigningKey,
+               sigttl:    fixtureBlobSigningTTL,
+               authToken: fixtureActiveToken,
+       }
        s.fs, err = s.coll.FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
 }
index 439eaec7c2a5dbde49f2fd2851551238a22166ec..254b90c812a337de96cb34da01b767dbe7adcc5a 100644 (file)
@@ -32,14 +32,14 @@ func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
                        log.Printf("BUG: unhandled error: %s", err)
                        return placeholder
                }
-               cfs, err := coll.FileSystem(fs, fs)
+               newfs, err := coll.FileSystem(fs, fs)
                if err != nil {
                        log.Printf("BUG: unhandled error: %s", err)
                        return placeholder
                }
-               root := cfs.rootnode()
-               root.SetParent(parent, coll.Name)
-               return root
+               cfs := newfs.(*collectionFileSystem)
+               cfs.SetParent(parent, coll.Name)
+               return cfs
        }}
 }
 
@@ -87,6 +87,19 @@ func (dn *deferrednode) Child(name string, replace func(inode) (inode, error)) (
        return dn.realinode().Child(name, replace)
 }
 
+// Sync is a no-op if the real inode hasn't even been created yet.
+func (dn *deferrednode) Sync() error {
+       dn.mtx.Lock()
+       defer dn.mtx.Unlock()
+       if !dn.created {
+               return nil
+       } else if syncer, ok := dn.wrapped.(syncer); ok {
+               return syncer.Sync()
+       } else {
+               return ErrInvalidOperation
+       }
+}
+
 func (dn *deferrednode) Truncate(size int64) error       { return dn.realinode().Truncate(size) }
 func (dn *deferrednode) SetParent(p inode, name string)  { dn.realinode().SetParent(p, name) }
 func (dn *deferrednode) IsDir() bool                     { return dn.currentinode().IsDir() }
index 42322a14a9adda155c75f225ef43e2cdfd96615c..56b5953234784424e51676a90b4c148661cb8c4d 100644 (file)
@@ -15,7 +15,7 @@ import (
 //
 // See (*customFileSystem)MountUsers for example usage.
 type lookupnode struct {
-       inode
+       treenode
        loadOne func(parent inode, name string) (inode, error)
        loadAll func(parent inode) ([]inode, error)
        stale   func(time.Time) bool
@@ -26,6 +26,20 @@ type lookupnode struct {
        staleOne  map[string]time.Time
 }
 
+// Sync flushes pending writes for loaded children and, if successful,
+// triggers a reload on next lookup.
+func (ln *lookupnode) Sync() error {
+       err := ln.treenode.Sync()
+       if err != nil {
+               return err
+       }
+       ln.staleLock.Lock()
+       ln.staleAll = time.Time{}
+       ln.staleOne = nil
+       ln.staleLock.Unlock()
+       return nil
+}
+
 func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
        ln.staleLock.Lock()
        defer ln.staleLock.Unlock()
@@ -36,7 +50,7 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
                        return nil, err
                }
                for _, child := range all {
-                       _, err = ln.inode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
+                       _, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
                                return child, nil
                        })
                        if err != nil {
@@ -49,25 +63,47 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
                // newer than ln.staleAll. Reclaim memory.
                ln.staleOne = nil
        }
-       return ln.inode.Readdir()
+       return ln.treenode.Readdir()
 }
 
+// Child rejects (with ErrInvalidArgument) calls to add/replace
+// children, instead calling loadOne when a non-existing child is
+// looked up.
 func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
        ln.staleLock.Lock()
        defer ln.staleLock.Unlock()
        checkTime := time.Now()
+       var existing inode
+       var err error
        if ln.stale(ln.staleAll) && ln.stale(ln.staleOne[name]) {
-               _, err := ln.inode.Child(name, func(inode) (inode, error) {
+               existing, err = ln.treenode.Child(name, func(inode) (inode, error) {
                        return ln.loadOne(ln, name)
                })
-               if err != nil {
-                       return nil, err
+               if err == nil && existing != nil {
+                       if ln.staleOne == nil {
+                               ln.staleOne = map[string]time.Time{name: checkTime}
+                       } else {
+                               ln.staleOne[name] = checkTime
+                       }
                }
-               if ln.staleOne == nil {
-                       ln.staleOne = map[string]time.Time{name: checkTime}
-               } else {
-                       ln.staleOne[name] = checkTime
+       } else {
+               existing, err = ln.treenode.Child(name, nil)
+               if err != nil && !os.IsNotExist(err) {
+                       return existing, err
+               }
+       }
+       if replace != nil {
+               // Let the callback try to delete or replace the
+               // existing node; if it does, return
+               // ErrInvalidArgument.
+               if tryRepl, err := replace(existing); err != nil {
+                       // Propagate error from callback
+                       return existing, err
+               } else if tryRepl != existing {
+                       return existing, ErrInvalidArgument
                }
        }
-       return ln.inode.Child(name, replace)
+       // Return original error from ln.treenode.Child() (it might be
+       // ErrNotExist).
+       return existing, err
 }
index c5eb03360a3877b577168611a8e579329b6abfa8..bf6391a74e4455ea076f82ac49173cb36e825a48 100644 (file)
@@ -6,7 +6,6 @@ package arvados
 
 import (
        "log"
-       "os"
        "strings"
 )
 
@@ -57,7 +56,7 @@ func (fs *customFileSystem) projectsLoadOne(parent inode, uuid, name string) (in
                // both "/" and the substitution string.
        }
        if len(contents.Items) == 0 {
-               return nil, os.ErrNotExist
+               return nil, nil
        }
        coll := contents.Items[0]
 
index 61d82c7fa9f4e442d6492ba8fc0f285df76bd5f2..cb2e54bda261ed590b39b59a90235f6b991787a3 100644 (file)
@@ -200,6 +200,22 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        err = wf.Close()
        c.Check(err, check.IsNil)
 
+       err = project.Sync()
+       c.Check(err, check.IsNil)
+       _, err = s.fs.Open("/home/A Project/oob/test.txt")
+       c.Check(err, check.IsNil)
+
+       // Sync again to mark the project dir as stale, so the
+       // collection gets reloaded from the controller on next
+       // lookup.
+       err = project.Sync()
+       c.Check(err, check.IsNil)
+
+       // Ensure collection was flushed by Sync
+       var latest Collection
+       err = s.client.RequestAndDecode(&latest, "GET", "arvados/v1/collections/"+oob.UUID, nil, nil)
+       c.Check(latest.ManifestText, check.Matches, `.*:test.txt.*\n`)
+
        // Delete test.txt behind s.fs's back by updating the
        // collection record with an empty ManifestText.
        err = s.client.RequestAndDecode(nil, "PATCH", "arvados/v1/collections/"+oob.UUID, nil, map[string]interface{}{
@@ -210,8 +226,6 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        })
        c.Assert(err, check.IsNil)
 
-       err = project.Sync()
-       c.Check(err, check.IsNil)
        _, err = s.fs.Open("/home/A Project/oob/test.txt")
        c.Check(err, check.NotNil)
        _, err = s.fs.Open("/home/A Project/oob")
@@ -221,7 +235,27 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        c.Assert(err, check.IsNil)
 
        err = project.Sync()
-       c.Check(err, check.IsNil)
+       c.Check(err, check.NotNil) // can't update the deleted collection
        _, err = s.fs.Open("/home/A Project/oob")
-       c.Check(err, check.NotNil)
+       c.Check(err, check.IsNil) // parent dir still has old collection -- didn't reload, because Sync failed
+}
+
+func (s *SiteFSSuite) TestProjectUnsupportedOperations(c *check.C) {
+       s.fs.MountByID("by_id")
+       s.fs.MountProject("home", "")
+
+       _, err := s.fs.OpenFile("/home/A Project/newfilename", os.O_CREATE|os.O_RDWR, 0)
+       c.Check(err, check.ErrorMatches, "invalid argument")
+
+       err = s.fs.Mkdir("/home/A Project/newdirname", 0)
+       c.Check(err, check.ErrorMatches, "invalid argument")
+
+       err = s.fs.Mkdir("/by_id/newdirname", 0)
+       c.Check(err, check.ErrorMatches, "invalid argument")
+
+       err = s.fs.Mkdir("/by_id/"+fixtureAProjectUUID+"/newdirname", 0)
+       c.Check(err, check.ErrorMatches, "invalid argument")
+
+       _, err = s.fs.OpenFile("/home/A Project", 0, 0)
+       c.Check(err, check.IsNil)
 }
index 7826d335c81fa93cfe54bf39a81a66335a65d336..900893aa36420e7c9d2008fff31b36d4bd03e0bf 100644 (file)
@@ -40,7 +40,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
                        thr:       newThrottle(concurrentWriters),
                },
        }
-       root.inode = &treenode{
+       root.treenode = treenode{
                fs:     fs,
                parent: root,
                fileinfo: fileinfo{
@@ -54,9 +54,9 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
 }
 
 func (fs *customFileSystem) MountByID(mount string) {
-       fs.root.inode.Child(mount, func(inode) (inode, error) {
+       fs.root.treenode.Child(mount, func(inode) (inode, error) {
                return &vdirnode{
-                       inode: &treenode{
+                       treenode: treenode{
                                fs:     fs,
                                parent: fs.root,
                                inodes: make(map[string]inode),
@@ -72,18 +72,18 @@ func (fs *customFileSystem) MountByID(mount string) {
 }
 
 func (fs *customFileSystem) MountProject(mount, uuid string) {
-       fs.root.inode.Child(mount, func(inode) (inode, error) {
+       fs.root.treenode.Child(mount, func(inode) (inode, error) {
                return fs.newProjectNode(fs.root, mount, uuid), nil
        })
 }
 
 func (fs *customFileSystem) MountUsers(mount string) {
-       fs.root.inode.Child(mount, func(inode) (inode, error) {
+       fs.root.treenode.Child(mount, func(inode) (inode, error) {
                return &lookupnode{
                        stale:   fs.Stale,
                        loadOne: fs.usersLoadOne,
                        loadAll: fs.usersLoadAll,
-                       inode: &treenode{
+                       treenode: treenode{
                                fs:     fs,
                                parent: fs.root,
                                inodes: make(map[string]inode),
@@ -115,10 +115,7 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
 }
 
 func (fs *customFileSystem) Sync() error {
-       fs.staleLock.Lock()
-       defer fs.staleLock.Unlock()
-       fs.staleThreshold = time.Now()
-       return nil
+       return fs.root.Sync()
 }
 
 // Stale returns true if information obtained at time t should be
@@ -130,7 +127,7 @@ func (fs *customFileSystem) Stale(t time.Time) bool {
 }
 
 func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
-       return nil, ErrInvalidOperation
+       return nil, ErrInvalidArgument
 }
 
 func (fs *customFileSystem) mountByID(parent inode, id string) inode {
@@ -149,13 +146,13 @@ func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
        if err != nil {
                return nil
        }
-       cfs, err := coll.FileSystem(fs, fs)
+       newfs, err := coll.FileSystem(fs, fs)
        if err != nil {
                return nil
        }
-       root := cfs.rootnode()
-       root.SetParent(parent, id)
-       return root
+       cfs := newfs.(*collectionFileSystem)
+       cfs.SetParent(parent, id)
+       return cfs
 }
 
 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
@@ -163,7 +160,7 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode
                stale:   fs.Stale,
                loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
                loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
-               inode: &treenode{
+               treenode: treenode{
                        fs:     fs,
                        parent: root,
                        inodes: make(map[string]inode),
@@ -176,24 +173,24 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode
        }
 }
 
-// vdirnode wraps an inode by ignoring any requests to add/replace
-// children, and calling a create() func when a non-existing child is
-// looked up.
+// vdirnode wraps an inode by rejecting (with ErrInvalidArgument)
+// calls that add/replace children directly, instead calling a
+// create() func when a non-existing child is looked up.
 //
 // create() can return either a new node, which will be added to the
 // treenode, or nil for ENOENT.
 type vdirnode struct {
-       inode
+       treenode
        create func(parent inode, name string) inode
 }
 
 func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
-       return vn.inode.Child(name, func(existing inode) (inode, error) {
+       return vn.treenode.Child(name, func(existing inode) (inode, error) {
                if existing == nil && vn.create != nil {
                        existing = vn.create(vn, name)
                        if existing != nil {
                                existing.SetParent(vn, name)
-                               vn.inode.(*treenode).fileinfo.modTime = time.Now()
+                               vn.treenode.fileinfo.modTime = time.Now()
                        }
                }
                if replace == nil {
index 80cc03df37b88ad82ad246db6d4d7bce68dd68a2..778b12015a6f3964be7db301f30cd8ca5db1a971 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "net/http"
        "os"
+       "time"
 
        check "gopkg.in/check.v1"
 )
@@ -22,6 +23,8 @@ const (
        fixtureFooCollectionPDH        = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
        fixtureFooCollection           = "zzzzz-4zz18-fy296fx3hot09f7"
        fixtureNonexistentCollection   = "zzzzz-4zz18-totallynotexist"
+       fixtureBlobSigningKey          = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+       fixtureBlobSigningTTL          = 336 * time.Hour
 )
 
 var _ = check.Suite(&SiteFSSuite{})
@@ -41,7 +44,11 @@ func (s *SiteFSSuite) SetUpTest(c *check.C) {
        s.kc = &keepClientStub{
                blocks: map[string][]byte{
                        "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
-               }}
+               },
+               sigkey:    fixtureBlobSigningKey,
+               sigttl:    fixtureBlobSigningTTL,
+               authToken: fixtureActiveToken,
+       }
        s.fs = s.client.SiteFileSystem(s.kc)
 }
 
@@ -98,7 +105,7 @@ func (s *SiteFSSuite) TestByUUIDAndPDH(c *check.C) {
        c.Check(names, check.DeepEquals, []string{"baz"})
 
        _, err = s.fs.OpenFile("/by_id/"+fixtureNonexistentCollection, os.O_RDWR|os.O_CREATE, 0755)
-       c.Check(err, check.Equals, ErrInvalidOperation)
+       c.Check(err, check.Equals, ErrInvalidArgument)
        err = s.fs.Rename("/by_id/"+fixtureFooCollection, "/by_id/beep")
        c.Check(err, check.Equals, ErrInvalidArgument)
        err = s.fs.Rename("/by_id/"+fixtureFooCollection+"/foo", "/by_id/beep")
index 643ca4f587f51bc9b353ab29b4a82869d96578a8..915924e28863c8e5de97af724c60249583c23406 100644 (file)
@@ -227,6 +227,10 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
        }
 
+       if h.serveS3(w, r) {
+               return
+       }
+
        pathParts := strings.Split(r.URL.Path[1:], "/")
 
        var stripParts int
@@ -509,6 +513,27 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
 }
 
+func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
+       arv = h.clientPool.Get()
+       if arv == nil {
+               return nil, nil, nil, nil, err
+       }
+       release = func() { h.clientPool.Put(arv) }
+       arv.ApiToken = token
+       kc, err = keepclient.MakeKeepClient(arv)
+       if err != nil {
+               release()
+               return
+       }
+       kc.RequestID = reqID
+       client = (&arvados.Client{
+               APIHost:   arv.ApiServer,
+               AuthToken: arv.ApiToken,
+               Insecure:  arv.ApiInsecure,
+       }).WithRequestID(reqID)
+       return
+}
+
 func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
        if len(tokens) == 0 {
                w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
@@ -519,25 +544,13 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
                http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
-       arv := h.clientPool.Get()
-       if arv == nil {
+       _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+       if err != nil {
                http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
                return
        }
-       defer h.clientPool.Put(arv)
-       arv.ApiToken = tokens[0]
+       defer release()
 
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-       kc.RequestID = r.Header.Get("X-Request-Id")
-       client := (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(r.Header.Get("X-Request-Id"))
        fs := client.SiteFileSystem(kc)
        fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
        f, err := fs.Open(r.URL.Path)
index e4028842f0c6b9390715a93c836846f2d9ba753b..647eab1653294311644bdce91faa367bd0ec1832 100644 (file)
@@ -14,6 +14,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
+       "github.com/sirupsen/logrus"
        log "github.com/sirupsen/logrus"
 )
 
@@ -111,7 +112,7 @@ func main() {
 
        os.Setenv("ARVADOS_API_HOST", cfg.cluster.Services.Controller.ExternalURL.Host)
        srv := &server{Config: cfg}
-       if err := srv.Start(); err != nil {
+       if err := srv.Start(logrus.StandardLogger()); err != nil {
                log.Fatal(err)
        }
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
new file mode 100644 (file)
index 0000000..12e294d
--- /dev/null
@@ -0,0 +1,404 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "encoding/xml"
+       "errors"
+       "fmt"
+       "io"
+       "net/http"
+       "os"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/AdRoll/goamz/s3"
+)
+
+const s3MaxKeys = 1000
+
+// serveS3 handles r and returns true if r is a request from an S3
+// client, otherwise it returns false.
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
+       var token string
+       if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+               split := strings.SplitN(auth[4:], ":", 2)
+               if len(split) < 2 {
+                       w.WriteHeader(http.StatusUnauthorized)
+                       return true
+               }
+               token = split[0]
+       } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+               for _, cmpt := range strings.Split(auth[17:], ",") {
+                       cmpt = strings.TrimSpace(cmpt)
+                       split := strings.SplitN(cmpt, "=", 2)
+                       if len(split) == 2 && split[0] == "Credential" {
+                               keyandscope := strings.Split(split[1], "/")
+                               if len(keyandscope[0]) > 0 {
+                                       token = keyandscope[0]
+                                       break
+                               }
+                       }
+               }
+               if token == "" {
+                       w.WriteHeader(http.StatusBadRequest)
+                       fmt.Println(w, "invalid V4 signature")
+                       return true
+               }
+       } else {
+               return false
+       }
+
+       _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+       if err != nil {
+               http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+               return true
+       }
+       defer release()
+
+       fs := client.SiteFileSystem(kc)
+       fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+       objectNameGiven := strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") > 1
+
+       switch {
+       case r.Method == http.MethodGet && !objectNameGiven:
+               // Path is "/{uuid}" or "/{uuid}/", has no object name
+               if _, ok := r.URL.Query()["versioning"]; ok {
+                       // GetBucketVersioning
+                       w.Header().Set("Content-Type", "application/xml")
+                       io.WriteString(w, xml.Header)
+                       fmt.Fprintln(w, `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`)
+               } else {
+                       // ListObjects
+                       h.s3list(w, r, fs)
+               }
+               return true
+       case r.Method == http.MethodGet || r.Method == http.MethodHead:
+               fspath := "/by_id" + r.URL.Path
+               fi, err := fs.Stat(fspath)
+               if r.Method == "HEAD" && !objectNameGiven {
+                       // HeadBucket
+                       if err == nil && fi.IsDir() {
+                               w.WriteHeader(http.StatusOK)
+                       } else if os.IsNotExist(err) {
+                               w.WriteHeader(http.StatusNotFound)
+                       } else {
+                               http.Error(w, err.Error(), http.StatusBadGateway)
+                       }
+                       return true
+               }
+               if err == nil && fi.IsDir() && objectNameGiven && strings.HasSuffix(fspath, "/") && h.Config.cluster.Collections.S3FolderObjects {
+                       w.Header().Set("Content-Type", "application/x-directory")
+                       w.WriteHeader(http.StatusOK)
+                       return true
+               }
+               if os.IsNotExist(err) ||
+                       (err != nil && err.Error() == "not a directory") ||
+                       (fi != nil && fi.IsDir()) {
+                       http.Error(w, "not found", http.StatusNotFound)
+                       return true
+               }
+               // shallow copy r, and change URL path
+               r := *r
+               r.URL.Path = fspath
+               http.FileServer(fs).ServeHTTP(w, &r)
+               return true
+       case r.Method == http.MethodPut:
+               if !objectNameGiven {
+                       http.Error(w, "missing object name in PUT request", http.StatusBadRequest)
+                       return true
+               }
+               fspath := "by_id" + r.URL.Path
+               var objectIsDir bool
+               if strings.HasSuffix(fspath, "/") {
+                       if !h.Config.cluster.Collections.S3FolderObjects {
+                               http.Error(w, "invalid object name: trailing slash", http.StatusBadRequest)
+                               return true
+                       }
+                       n, err := r.Body.Read(make([]byte, 1))
+                       if err != nil && err != io.EOF {
+                               http.Error(w, fmt.Sprintf("error reading request body: %s", err), http.StatusInternalServerError)
+                               return true
+                       } else if n > 0 {
+                               http.Error(w, "cannot create object with trailing '/' char unless content is empty", http.StatusBadRequest)
+                               return true
+                       } else if strings.SplitN(r.Header.Get("Content-Type"), ";", 2)[0] != "application/x-directory" {
+                               http.Error(w, "cannot create object with trailing '/' char unless Content-Type is 'application/x-directory'", http.StatusBadRequest)
+                               return true
+                       }
+                       // Given PUT "foo/bar/", we'll use "foo/bar/."
+                       // in the "ensure parents exist" block below,
+                       // and then we'll be done.
+                       fspath += "."
+                       objectIsDir = true
+               }
+               fi, err := fs.Stat(fspath)
+               if err != nil && err.Error() == "not a directory" {
+                       // requested foo/bar, but foo is a file
+                       http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
+                       return true
+               }
+               if strings.HasSuffix(r.URL.Path, "/") && err == nil && !fi.IsDir() {
+                       // requested foo/bar/, but foo/bar is a file
+                       http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
+                       return true
+               }
+               // create missing parent/intermediate directories, if any
+               for i, c := range fspath {
+                       if i > 0 && c == '/' {
+                               dir := fspath[:i]
+                               if strings.HasSuffix(dir, "/") {
+                                       err = errors.New("invalid object name (consecutive '/' chars)")
+                                       http.Error(w, err.Error(), http.StatusBadRequest)
+                                       return true
+                               }
+                               err = fs.Mkdir(dir, 0755)
+                               if err == arvados.ErrInvalidArgument {
+                                       // Cannot create a directory
+                                       // here.
+                                       err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+                                       http.Error(w, err.Error(), http.StatusBadRequest)
+                                       return true
+                               } else if err != nil && !os.IsExist(err) {
+                                       err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+                                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                                       return true
+                               }
+                       }
+               }
+               if !objectIsDir {
+                       f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+                       if os.IsNotExist(err) {
+                               f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+                       }
+                       if err != nil {
+                               err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+                               http.Error(w, err.Error(), http.StatusBadRequest)
+                               return true
+                       }
+                       defer f.Close()
+                       _, err = io.Copy(f, r.Body)
+                       if err != nil {
+                               err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+                               http.Error(w, err.Error(), http.StatusBadGateway)
+                               return true
+                       }
+                       err = f.Close()
+                       if err != nil {
+                               err = fmt.Errorf("write to %q failed: close: %w", r.URL.Path, err)
+                               http.Error(w, err.Error(), http.StatusBadGateway)
+                               return true
+                       }
+               }
+               err = fs.Sync()
+               if err != nil {
+                       err = fmt.Errorf("sync failed: %w", err)
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return true
+               }
+               w.WriteHeader(http.StatusOK)
+               return true
+       default:
+               http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return true
+       }
+}
+
+// Call fn on the given path (directory) and its contents, in
+// lexicographic order.
+//
+// If isRoot==true and path is not a directory, return nil.
+//
+// If fn returns filepath.SkipDir when called on a directory, don't
+// descend into that directory.
+func walkFS(fs arvados.CustomFileSystem, path string, isRoot bool, fn func(path string, fi os.FileInfo) error) error {
+       if isRoot {
+               fi, err := fs.Stat(path)
+               if os.IsNotExist(err) || (err == nil && !fi.IsDir()) {
+                       return nil
+               } else if err != nil {
+                       return err
+               }
+               err = fn(path, fi)
+               if err == filepath.SkipDir {
+                       return nil
+               } else if err != nil {
+                       return err
+               }
+       }
+       f, err := fs.Open(path)
+       if os.IsNotExist(err) && isRoot {
+               return nil
+       } else if err != nil {
+               return fmt.Errorf("open %q: %w", path, err)
+       }
+       defer f.Close()
+       if path == "/" {
+               path = ""
+       }
+       fis, err := f.Readdir(-1)
+       if err != nil {
+               return err
+       }
+       sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
+       for _, fi := range fis {
+               err = fn(path+"/"+fi.Name(), fi)
+               if err == filepath.SkipDir {
+                       continue
+               } else if err != nil {
+                       return err
+               }
+               if fi.IsDir() {
+                       err = walkFS(fs, path+"/"+fi.Name(), false, fn)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+var errDone = errors.New("done")
+
+func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) {
+       var params struct {
+               bucket    string
+               delimiter string
+               marker    string
+               maxKeys   int
+               prefix    string
+       }
+       params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0]
+       params.delimiter = r.FormValue("delimiter")
+       params.marker = r.FormValue("marker")
+       if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 && mk < s3MaxKeys {
+               params.maxKeys = int(mk)
+       } else {
+               params.maxKeys = s3MaxKeys
+       }
+       params.prefix = r.FormValue("prefix")
+
+       bucketdir := "by_id/" + params.bucket
+       // walkpath is the directory (relative to bucketdir) we need
+       // to walk: the innermost directory that is guaranteed to
+       // contain all paths that have the requested prefix. Examples:
+       // prefix "foo/bar"  => walkpath "foo"
+       // prefix "foo/bar/" => walkpath "foo/bar"
+       // prefix "foo"      => walkpath ""
+       // prefix ""         => walkpath ""
+       walkpath := params.prefix
+       if cut := strings.LastIndex(walkpath, "/"); cut >= 0 {
+               walkpath = walkpath[:cut]
+       } else {
+               walkpath = ""
+       }
+
+       resp := s3.ListResp{
+               Name:      strings.SplitN(r.URL.Path[1:], "/", 2)[0],
+               Prefix:    params.prefix,
+               Delimiter: params.delimiter,
+               Marker:    params.marker,
+               MaxKeys:   params.maxKeys,
+       }
+       commonPrefixes := map[string]bool{}
+       err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
+               if path == bucketdir {
+                       return nil
+               }
+               path = path[len(bucketdir)+1:]
+               filesize := fi.Size()
+               if fi.IsDir() {
+                       path += "/"
+                       filesize = 0
+               }
+               if len(path) <= len(params.prefix) {
+                       if path > params.prefix[:len(path)] {
+                               // with prefix "foobar", walking "fooz" means we're done
+                               return errDone
+                       }
+                       if path < params.prefix[:len(path)] {
+                               // with prefix "foobar", walking "foobag" is pointless
+                               return filepath.SkipDir
+                       }
+                       if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) {
+                               // with prefix "foo/bar", walking "fo"
+                               // is pointless (but walking "foo" or
+                               // "foo/bar" is necessary)
+                               return filepath.SkipDir
+                       }
+                       if len(path) < len(params.prefix) {
+                               // can't skip anything, and this entry
+                               // isn't in the results, so just
+                               // continue descent
+                               return nil
+                       }
+               } else {
+                       if path[:len(params.prefix)] > params.prefix {
+                               // with prefix "foobar", nothing we
+                               // see after "foozzz" is relevant
+                               return errDone
+                       }
+               }
+               if path < params.marker || path < params.prefix {
+                       return nil
+               }
+               if fi.IsDir() && !h.Config.cluster.Collections.S3FolderObjects {
+                       // Note we don't add anything to
+                       // commonPrefixes here even if delimiter is
+                       // "/". We descend into the directory, and
+                       // return a commonPrefix only if we end up
+                       // finding a regular file inside it.
+                       return nil
+               }
+               if params.delimiter != "" {
+                       idx := strings.Index(path[len(params.prefix):], params.delimiter)
+                       if idx >= 0 {
+                               // with prefix "foobar" and delimiter
+                               // "z", when we hit "foobar/baz", we
+                               // add "/baz" to commonPrefixes and
+                               // stop descending.
+                               commonPrefixes[path[:len(params.prefix)+idx+1]] = true
+                               return filepath.SkipDir
+                       }
+               }
+               if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
+                       resp.IsTruncated = true
+                       if params.delimiter != "" {
+                               resp.NextMarker = path
+                       }
+                       return errDone
+               }
+               resp.Contents = append(resp.Contents, s3.Key{
+                       Key:          path,
+                       LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
+                       Size:         filesize,
+               })
+               return nil
+       })
+       if err != nil && err != errDone {
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+       if params.delimiter != "" {
+               for prefix := range commonPrefixes {
+                       resp.CommonPrefixes = append(resp.CommonPrefixes, prefix)
+                       sort.Strings(resp.CommonPrefixes)
+               }
+       }
+       wrappedResp := struct {
+               XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
+               s3.ListResp
+       }{"", resp}
+       w.Header().Set("Content-Type", "application/xml")
+       io.WriteString(w, xml.Header)
+       if err := xml.NewEncoder(w).Encode(wrappedResp); err != nil {
+               ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
+       }
+}
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
new file mode 100644 (file)
index 0000000..73553ff
--- /dev/null
@@ -0,0 +1,546 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "bytes"
+       "crypto/rand"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "os"
+       "strings"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/AdRoll/goamz/aws"
+       "github.com/AdRoll/goamz/s3"
+       check "gopkg.in/check.v1"
+)
+
+type s3stage struct {
+       arv        *arvados.Client
+       ac         *arvadosclient.ArvadosClient
+       kc         *keepclient.KeepClient
+       proj       arvados.Group
+       projbucket *s3.Bucket
+       coll       arvados.Collection
+       collbucket *s3.Bucket
+}
+
+func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
+       var proj arvados.Group
+       var coll arvados.Collection
+       arv := arvados.NewClientFromEnv()
+       arv.AuthToken = arvadostest.ActiveToken
+       err := arv.RequestAndDecode(&proj, "POST", "arvados/v1/groups", nil, map[string]interface{}{
+               "group": map[string]interface{}{
+                       "group_class": "project",
+                       "name":        "keep-web s3 test",
+               },
+               "ensure_unique_name": true,
+       })
+       c.Assert(err, check.IsNil)
+       err = arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+               "owner_uuid":    proj.UUID,
+               "name":          "keep-web s3 test collection",
+               "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+       }})
+       c.Assert(err, check.IsNil)
+       ac, err := arvadosclient.New(arv)
+       c.Assert(err, check.IsNil)
+       kc, err := keepclient.MakeKeepClient(ac)
+       c.Assert(err, check.IsNil)
+       fs, err := coll.FileSystem(arv, kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("sailboat.txt", os.O_CREATE|os.O_WRONLY, 0644)
+       c.Assert(err, check.IsNil)
+       _, err = f.Write([]byte("⛵\n"))
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+       err = fs.Sync()
+       c.Assert(err, check.IsNil)
+       err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(err, check.IsNil)
+
+       auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
+       region := aws.Region{
+               Name:       s.testServer.Addr,
+               S3Endpoint: "http://" + s.testServer.Addr,
+       }
+       client := s3.New(*auth, region)
+       return s3stage{
+               arv:  arv,
+               ac:   ac,
+               kc:   kc,
+               proj: proj,
+               projbucket: &s3.Bucket{
+                       S3:   client,
+                       Name: proj.UUID,
+               },
+               coll: coll,
+               collbucket: &s3.Bucket{
+                       S3:   client,
+                       Name: coll.UUID,
+               },
+       }
+}
+
+func (stage s3stage) teardown(c *check.C) {
+       if stage.coll.UUID != "" {
+               err := stage.arv.RequestAndDecode(&stage.coll, "DELETE", "arvados/v1/collections/"+stage.coll.UUID, nil, nil)
+               c.Check(err, check.IsNil)
+       }
+       if stage.proj.UUID != "" {
+               err := stage.arv.RequestAndDecode(&stage.proj, "DELETE", "arvados/v1/groups/"+stage.proj.UUID, nil, nil)
+               c.Check(err, check.IsNil)
+       }
+}
+
+func (s *IntegrationSuite) TestS3HeadBucket(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+
+       for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+               c.Logf("bucket %s", bucket.Name)
+               exists, err := bucket.Exists("")
+               c.Check(err, check.IsNil)
+               c.Check(exists, check.Equals, true)
+       }
+}
+
+func (s *IntegrationSuite) TestS3CollectionGetObject(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3GetObject(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectGetObject(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3GetObject(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix string) {
+       rdr, err := bucket.GetReader(prefix + "emptyfile")
+       c.Assert(err, check.IsNil)
+       buf, err := ioutil.ReadAll(rdr)
+       c.Check(err, check.IsNil)
+       c.Check(len(buf), check.Equals, 0)
+       err = rdr.Close()
+       c.Check(err, check.IsNil)
+
+       // GetObject
+       rdr, err = bucket.GetReader(prefix + "missingfile")
+       c.Check(err, check.ErrorMatches, `404 Not Found`)
+
+       // HeadObject
+       exists, err := bucket.Exists(prefix + "missingfile")
+       c.Check(err, check.IsNil)
+       c.Check(exists, check.Equals, false)
+
+       // GetObject
+       rdr, err = bucket.GetReader(prefix + "sailboat.txt")
+       c.Assert(err, check.IsNil)
+       buf, err = ioutil.ReadAll(rdr)
+       c.Check(err, check.IsNil)
+       c.Check(buf, check.DeepEquals, []byte("⛵\n"))
+       err = rdr.Close()
+       c.Check(err, check.IsNil)
+
+       // HeadObject
+       exists, err = bucket.Exists(prefix + "sailboat.txt")
+       c.Check(err, check.IsNil)
+       c.Check(exists, check.Equals, true)
+}
+
+func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3PutObjectSuccess(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
+       for _, trial := range []struct {
+               path        string
+               size        int
+               contentType string
+       }{
+               {
+                       path:        "newfile",
+                       size:        128000000,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir/newfile",
+                       size:        1 << 26,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir1/newdir2/newfile",
+                       size:        0,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir1/newdir2/newdir3/",
+                       size:        0,
+                       contentType: "application/x-directory",
+               },
+       } {
+               c.Logf("=== %v", trial)
+
+               objname := prefix + trial.path
+
+               _, err := bucket.GetReader(objname)
+               c.Assert(err, check.ErrorMatches, `404 Not Found`)
+
+               buf := make([]byte, trial.size)
+               rand.Read(buf)
+
+               err = bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), trial.contentType, s3.Private, s3.Options{})
+               c.Check(err, check.IsNil)
+
+               rdr, err := bucket.GetReader(objname)
+               if strings.HasSuffix(trial.path, "/") && !s.testServer.Config.cluster.Collections.S3FolderObjects {
+                       c.Check(err, check.NotNil)
+                       continue
+               } else if !c.Check(err, check.IsNil) {
+                       continue
+               }
+               buf2, err := ioutil.ReadAll(rdr)
+               c.Check(err, check.IsNil)
+               c.Check(buf2, check.HasLen, len(buf))
+               c.Check(bytes.Equal(buf, buf2), check.Equals, true)
+       }
+}
+
+func (s *IntegrationSuite) TestS3ProjectPutObjectNotSupported(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       bucket := stage.projbucket
+
+       for _, trial := range []struct {
+               path        string
+               size        int
+               contentType string
+       }{
+               {
+                       path:        "newfile",
+                       size:        1234,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir/newfile",
+                       size:        1234,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir2/",
+                       size:        0,
+                       contentType: "application/x-directory",
+               },
+       } {
+               c.Logf("=== %v", trial)
+
+               _, err := bucket.GetReader(trial.path)
+               c.Assert(err, check.ErrorMatches, `404 Not Found`)
+
+               buf := make([]byte, trial.size)
+               rand.Read(buf)
+
+               err = bucket.PutReader(trial.path, bytes.NewReader(buf), int64(len(buf)), trial.contentType, s3.Private, s3.Options{})
+               c.Check(err, check.ErrorMatches, `400 Bad Request`)
+
+               _, err = bucket.GetReader(trial.path)
+               c.Assert(err, check.ErrorMatches, `404 Not Found`)
+       }
+}
+
+func (s *IntegrationSuite) TestS3CollectionPutObjectFailure(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3PutObjectFailure(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
+       s.testServer.Config.cluster.Collections.S3FolderObjects = false
+       var wg sync.WaitGroup
+       for _, trial := range []struct {
+               path string
+       }{
+               {
+                       path: "emptyfile/newname", // emptyfile exists, see s3setup()
+               }, {
+                       path: "emptyfile/", // emptyfile exists, see s3setup()
+               }, {
+                       path: "emptydir", // dir already exists, see s3setup()
+               }, {
+                       path: "emptydir/",
+               }, {
+                       path: "emptydir//",
+               }, {
+                       path: "newdir/",
+               }, {
+                       path: "newdir//",
+               }, {
+                       path: "/",
+               }, {
+                       path: "//",
+               }, {
+                       path: "foo//bar",
+               }, {
+                       path: "",
+               },
+       } {
+               trial := trial
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       c.Logf("=== %v", trial)
+
+                       objname := prefix + trial.path
+
+                       buf := make([]byte, 1234)
+                       rand.Read(buf)
+
+                       err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+                       if !c.Check(err, check.ErrorMatches, `400 Bad.*`, check.Commentf("PUT %q should fail", objname)) {
+                               return
+                       }
+
+                       if objname != "" && objname != "/" {
+                               _, err = bucket.GetReader(objname)
+                               c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+                       }
+               }()
+       }
+       wg.Wait()
+}
+
+func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
+       fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
+       c.Assert(err, check.IsNil)
+       for d := 0; d < dirs; d++ {
+               dir := fmt.Sprintf("dir%d", d)
+               c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
+               for i := 0; i < filesPerDir; i++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
+                       c.Assert(err, check.IsNil)
+                       c.Assert(f.Close(), check.IsNil)
+               }
+       }
+       c.Assert(fs.Sync(), check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+               req, err := http.NewRequest("GET", bucket.URL("/"), nil)
+               req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+               req.URL.RawQuery = "versioning"
+               resp, err := http.DefaultClient.Do(req)
+               c.Assert(err, check.IsNil)
+               c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+               buf, err := ioutil.ReadAll(resp.Body)
+               c.Assert(err, check.IsNil)
+               c.Check(string(buf), check.Equals, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"/>\n")
+       }
+}
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+
+       var markers int
+       for markers, s.testServer.Config.cluster.Collections.S3FolderObjects = range []bool{false, true} {
+               dirs := 2
+               filesPerDir := 1001
+               stage.writeBigDirs(c, dirs, filesPerDir)
+               // Total # objects is:
+               //                 2 file entries from s3setup (emptyfile and sailboat.txt)
+               //                +1 fake "directory" marker from s3setup (emptydir) (if enabled)
+               //             +dirs fake "directory" marker from writeBigDirs (dir0/, dir1/) (if enabled)
+               // +filesPerDir*dirs file entries from writeBigDirs (dir0/file0.txt, etc.)
+               s.testS3List(c, stage.collbucket, "", 4000, markers+2+(filesPerDir+markers)*dirs)
+               s.testS3List(c, stage.collbucket, "", 131, markers+2+(filesPerDir+markers)*dirs)
+               s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir+markers)
+       }
+}
+func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+       c.Logf("testS3List: prefix=%q pageSize=%d S3FolderObjects=%v", prefix, pageSize, s.testServer.Config.cluster.Collections.S3FolderObjects)
+       expectPageSize := pageSize
+       if expectPageSize > 1000 {
+               expectPageSize = 1000
+       }
+       gotKeys := map[string]s3.Key{}
+       nextMarker := ""
+       pages := 0
+       for {
+               resp, err := bucket.List(prefix, "", nextMarker, pageSize)
+               if !c.Check(err, check.IsNil) {
+                       break
+               }
+               c.Check(len(resp.Contents) <= expectPageSize, check.Equals, true)
+               if pages++; !c.Check(pages <= (expectFiles/expectPageSize)+1, check.Equals, true) {
+                       break
+               }
+               for _, key := range resp.Contents {
+                       gotKeys[key.Key] = key
+                       if strings.Contains(key.Key, "sailboat.txt") {
+                               c.Check(key.Size, check.Equals, int64(4))
+                       }
+               }
+               if !resp.IsTruncated {
+                       c.Check(resp.NextMarker, check.Equals, "")
+                       break
+               }
+               if !c.Check(resp.NextMarker, check.Not(check.Equals), "") {
+                       break
+               }
+               nextMarker = resp.NextMarker
+       }
+       c.Check(len(gotKeys), check.Equals, expectFiles)
+}
+
+func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
+       for _, s.testServer.Config.cluster.Collections.S3FolderObjects = range []bool{false, true} {
+               s.testS3CollectionListRollup(c)
+       }
+}
+
+func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+
+       dirs := 2
+       filesPerDir := 500
+       stage.writeBigDirs(c, dirs, filesPerDir)
+       err := stage.collbucket.PutReader("dingbats", &bytes.Buffer{}, 0, "application/octet-stream", s3.Private, s3.Options{})
+       c.Assert(err, check.IsNil)
+       var allfiles []string
+       for marker := ""; ; {
+               resp, err := stage.collbucket.List("", "", marker, 20000)
+               c.Check(err, check.IsNil)
+               for _, key := range resp.Contents {
+                       if len(allfiles) == 0 || allfiles[len(allfiles)-1] != key.Key {
+                               allfiles = append(allfiles, key.Key)
+                       }
+               }
+               marker = resp.NextMarker
+               if marker == "" {
+                       break
+               }
+       }
+       markers := 0
+       if s.testServer.Config.cluster.Collections.S3FolderObjects {
+               markers = 1
+       }
+       c.Check(allfiles, check.HasLen, dirs*(filesPerDir+markers)+3+markers)
+
+       gotDirMarker := map[string]bool{}
+       for _, name := range allfiles {
+               isDirMarker := strings.HasSuffix(name, "/")
+               if markers == 0 {
+                       c.Check(isDirMarker, check.Equals, false, check.Commentf("name %q", name))
+               } else if isDirMarker {
+                       gotDirMarker[name] = true
+               } else if i := strings.LastIndex(name, "/"); i >= 0 {
+                       c.Check(gotDirMarker[name[:i+1]], check.Equals, true, check.Commentf("name %q", name))
+                       gotDirMarker[name[:i+1]] = true // skip redundant complaints about this dir marker
+               }
+       }
+
+       for _, trial := range []struct {
+               prefix    string
+               delimiter string
+               marker    string
+       }{
+               {"", "", ""},
+               {"di", "/", ""},
+               {"di", "r", ""},
+               {"di", "n", ""},
+               {"dir0", "/", ""},
+               {"dir0/", "/", ""},
+               {"dir0/f", "/", ""},
+               {"dir0", "", ""},
+               {"dir0/", "", ""},
+               {"dir0/f", "", ""},
+               {"dir0", "/", "dir0/file14.txt"},       // no commonprefixes
+               {"", "", "dir0/file14.txt"},            // middle page, skip walking dir1
+               {"", "", "dir1/file14.txt"},            // middle page, skip walking dir0
+               {"", "", "dir1/file498.txt"},           // last page of results
+               {"dir1/file", "", "dir1/file498.txt"},  // last page of results, with prefix
+               {"dir1/file", "/", "dir1/file498.txt"}, // last page of results, with prefix + delimiter
+               {"dir1", "Z", "dir1/file498.txt"},      // delimiter "Z" never appears
+               {"dir2", "/", ""},                      // prefix "dir2" does not exist
+               {"", "/", ""},
+       } {
+               c.Logf("\n\n=== trial %+v markers=%d", trial, markers)
+
+               maxKeys := 20
+               resp, err := stage.collbucket.List(trial.prefix, trial.delimiter, trial.marker, maxKeys)
+               c.Check(err, check.IsNil)
+               if resp.IsTruncated && trial.delimiter == "" {
+                       // goamz List method fills in the missing
+                       // NextMarker field if resp.IsTruncated, so
+                       // now we can't really tell whether it was
+                       // sent by the server or by goamz. In cases
+                       // where it should be empty but isn't, assume
+                       // it's goamz's fault.
+                       resp.NextMarker = ""
+               }
+
+               var expectKeys []string
+               var expectPrefixes []string
+               var expectNextMarker string
+               var expectTruncated bool
+               for _, key := range allfiles {
+                       full := len(expectKeys)+len(expectPrefixes) >= maxKeys
+                       if !strings.HasPrefix(key, trial.prefix) || key < trial.marker {
+                               continue
+                       } else if idx := strings.Index(key[len(trial.prefix):], trial.delimiter); trial.delimiter != "" && idx >= 0 {
+                               prefix := key[:len(trial.prefix)+idx+1]
+                               if len(expectPrefixes) > 0 && expectPrefixes[len(expectPrefixes)-1] == prefix {
+                                       // same prefix as previous key
+                               } else if full {
+                                       expectNextMarker = key
+                                       expectTruncated = true
+                               } else {
+                                       expectPrefixes = append(expectPrefixes, prefix)
+                               }
+                       } else if full {
+                               if trial.delimiter != "" {
+                                       expectNextMarker = key
+                               }
+                               expectTruncated = true
+                               break
+                       } else {
+                               expectKeys = append(expectKeys, key)
+                       }
+               }
+
+               var gotKeys []string
+               for _, key := range resp.Contents {
+                       gotKeys = append(gotKeys, key.Key)
+               }
+               var gotPrefixes []string
+               for _, prefix := range resp.CommonPrefixes {
+                       gotPrefixes = append(gotPrefixes, prefix)
+               }
+               commentf := check.Commentf("trial %+v markers=%d", trial, markers)
+               c.Check(gotKeys, check.DeepEquals, expectKeys, commentf)
+               c.Check(gotPrefixes, check.DeepEquals, expectPrefixes, commentf)
+               c.Check(resp.NextMarker, check.Equals, expectNextMarker, commentf)
+               c.Check(resp.IsTruncated, check.Equals, expectTruncated, commentf)
+               c.Logf("=== trial %+v keys %q prefixes %q nextMarker %q", trial, gotKeys, gotPrefixes, resp.NextMarker)
+       }
+}
index 46dc3d30179343e29edea208588103fe947c3c08..8f623c627d067f843f2746a2b8b64248006b1a18 100644 (file)
@@ -20,12 +20,12 @@ type server struct {
        Config *Config
 }
 
-func (srv *server) Start() error {
+func (srv *server) Start(logger *logrus.Logger) error {
        h := &handler{Config: srv.Config}
        reg := prometheus.NewRegistry()
        h.Config.Cache.registry = reg
-       ctx := ctxlog.Context(context.Background(), logrus.StandardLogger())
-       mh := httpserver.Instrument(reg, nil, httpserver.HandlerWithContext(ctx, httpserver.AddRequestIDs(httpserver.LogRequests(h))))
+       ctx := ctxlog.Context(context.Background(), logger)
+       mh := httpserver.Instrument(reg, logger, httpserver.HandlerWithContext(ctx, httpserver.AddRequestIDs(httpserver.LogRequests(h))))
        h.MetricsAPI = mh.ServeAPI(h.Config.cluster.ManagementToken, http.NotFoundHandler())
        srv.Handler = mh
        var listen arvados.URL
index bca7ff49fa0820c8affd818dd6c8b26ceca81755..c37852a128bbaa9571ebf1527f3f9f6b6cee41ae 100644 (file)
@@ -442,7 +442,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
        cfg.cluster.ManagementToken = arvadostest.ManagementToken
        cfg.cluster.Users.AnonymousUserToken = arvadostest.AnonymousToken
        s.testServer = &server{Config: cfg}
-       err = s.testServer.Start()
+       err = s.testServer.Start(ctxlog.TestLogger(c))
        c.Assert(err, check.Equals, nil)
 }