github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
# Use of this feature is not recommended, if it can be avoided.
ForwardSlashNameSubstitution: ""
+ # Include "folder objects" in S3 ListObjects responses.
+ S3FolderObjects: true
+
# Managed collection properties. At creation time, if the client didn't
# provide the listed keys, they will be automatically populated following
# one of the following behaviors:
// exists.
var whitelist = map[string]bool{
// | sort -t'"' -k2,2
- "ClusterID": true,
"API": true,
"API.AsyncPermissionsUpdateInterval": false,
"API.DisabledAPIs": false,
+ "API.KeepServiceRequestTimeout": false,
"API.MaxConcurrentRequests": false,
"API.MaxIndexDatabaseRead": false,
"API.MaxItemsPerResponse": true,
"API.MaxRequestSize": true,
"API.RailsSessionSecretToken": false,
"API.RequestTimeout": true,
- "API.WebsocketClientEventQueue": false,
"API.SendTimeout": true,
+ "API.WebsocketClientEventQueue": false,
"API.WebsocketServerEventQueue": false,
- "API.KeepServiceRequestTimeout": false,
"AuditLogs": false,
"AuditLogs.MaxAge": false,
"AuditLogs.MaxDeleteBatch": false,
"AuditLogs.UnloggedAttributes": false,
+ "ClusterID": true,
"Collections": true,
+ "Collections.BalanceCollectionBatch": false,
+ "Collections.BalanceCollectionBuffers": false,
+ "Collections.BalancePeriod": false,
+ "Collections.BalanceTimeout": false,
+ "Collections.BlobDeleteConcurrency": false,
+ "Collections.BlobMissingReport": false,
+ "Collections.BlobReplicateConcurrency": false,
"Collections.BlobSigning": true,
"Collections.BlobSigningKey": false,
"Collections.BlobSigningTTL": true,
"Collections.BlobTrash": false,
- "Collections.BlobTrashLifetime": false,
- "Collections.BlobTrashConcurrency": false,
"Collections.BlobTrashCheckInterval": false,
- "Collections.BlobDeleteConcurrency": false,
- "Collections.BlobReplicateConcurrency": false,
+ "Collections.BlobTrashConcurrency": false,
+ "Collections.BlobTrashLifetime": false,
"Collections.CollectionVersioning": false,
"Collections.DefaultReplication": true,
"Collections.DefaultTrashLifetime": true,
"Collections.ManagedProperties.*": true,
"Collections.ManagedProperties.*.*": true,
"Collections.PreserveVersionIfIdle": true,
+ "Collections.S3FolderObjects": true,
"Collections.TrashSweepInterval": false,
"Collections.TrustAllContent": false,
"Collections.WebDAVCache": false,
- "Collections.BalanceCollectionBatch": false,
- "Collections.BalancePeriod": false,
- "Collections.BalanceTimeout": false,
- "Collections.BlobMissingReport": false,
- "Collections.BalanceCollectionBuffers": false,
"Containers": true,
"Containers.CloudVMs": false,
- "Containers.CrunchRunCommand": false,
"Containers.CrunchRunArgumentsList": false,
+ "Containers.CrunchRunCommand": false,
"Containers.DefaultKeepCacheRAM": true,
"Containers.DispatchPrivateKey": false,
"Containers.JobsAPI": true,
"Login.OpenIDConnect": true,
"Login.OpenIDConnect.ClientID": false,
"Login.OpenIDConnect.ClientSecret": false,
- "Login.OpenIDConnect.Enable": true,
- "Login.OpenIDConnect.Issuer": false,
"Login.OpenIDConnect.EmailClaim": false,
"Login.OpenIDConnect.EmailVerifiedClaim": false,
+ "Login.OpenIDConnect.Enable": true,
+ "Login.OpenIDConnect.Issuer": false,
"Login.OpenIDConnect.UsernameClaim": false,
"Login.PAM": true,
"Login.PAM.DefaultEmailDomain": false,
"Login.PAM.Enable": true,
"Login.PAM.Service": false,
+ "Login.RemoteTokenRefresh": true,
"Login.SSO": true,
"Login.SSO.Enable": true,
"Login.SSO.ProviderAppID": false,
"Login.SSO.ProviderAppSecret": false,
- "Login.RemoteTokenRefresh": true,
"Mail": true,
+ "Mail.EmailFrom": false,
+ "Mail.IssueReporterEmailFrom": false,
+ "Mail.IssueReporterEmailTo": false,
"Mail.MailchimpAPIKey": false,
"Mail.MailchimpListID": false,
"Mail.SendUserSetupNotificationEmail": false,
- "Mail.IssueReporterEmailFrom": false,
- "Mail.IssueReporterEmailTo": false,
"Mail.SupportEmailAddress": true,
- "Mail.EmailFrom": false,
"ManagementToken": false,
"PostgreSQL": false,
"RemoteClusters": true,
"SystemRootToken": false,
"TLS": false,
"Users": true,
- "Users.AnonymousUserToken": true,
"Users.AdminNotifierEmailFrom": false,
+ "Users.AnonymousUserToken": true,
"Users.AutoAdminFirstUser": false,
"Users.AutoAdminUserWithEmail": false,
"Users.AutoSetupNewUsers": false,
"Workbench.EnableGettingStartedPopup": true,
"Workbench.EnablePublicProjectsPage": true,
"Workbench.FileViewersConfigURL": true,
+ "Workbench.InactivePageHTML": true,
"Workbench.LogViewerMaxBytes": true,
"Workbench.MultiSiteSearch": true,
"Workbench.ProfilingEnabled": true,
"Workbench.ShowUserAgreementInline": true,
"Workbench.ShowUserNotifications": true,
"Workbench.SiteName": true,
+ "Workbench.SSHHelpHostSuffix": true,
+ "Workbench.SSHHelpPageHTML": true,
"Workbench.Theme": true,
"Workbench.UserProfileFormFields": true,
"Workbench.UserProfileFormFields.*": true,
"Workbench.UserProfileFormMessage": true,
"Workbench.VocabularyURL": true,
"Workbench.WelcomePageHTML": true,
- "Workbench.InactivePageHTML": true,
- "Workbench.SSHHelpPageHTML": true,
- "Workbench.SSHHelpHostSuffix": true,
}
func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
# Use of this feature is not recommended, if it can be avoided.
ForwardSlashNameSubstitution: ""
+ # Include "folder objects" in S3 ListObjects responses.
+ S3FolderObjects: true
+
# Managed collection properties. At creation time, if the client didn't
# provide the listed keys, they will be automatically populated following
# one of the following behaviors:
TrashSweepInterval Duration
TrustAllContent bool
ForwardSlashNameSubstitution string
+ S3FolderObjects bool
BlobMissingReport string
BalancePeriod Duration
ErrPermission = os.ErrPermission
)
+type syncer interface {
+ Sync() error
+}
+
// A File is an *os.File-like interface for reading and writing files
// in a FileSystem.
type File interface {
return
}
+func (n *treenode) Sync() error {
+ n.RLock()
+ defer n.RUnlock()
+ for _, inode := range n.inodes {
+ syncer, ok := inode.(syncer)
+ if !ok {
+ return ErrInvalidOperation
+ }
+ err := syncer.Sync()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
type fileSystem struct {
root inode
fsBackend
}
func (fs *fileSystem) Sync() error {
- log.Printf("TODO: sync fileSystem")
- return ErrInvalidOperation
+ if syncer, ok := fs.root.(syncer); ok {
+ return syncer.Sync()
+ } else {
+ return ErrInvalidOperation
+ }
}
func (fs *fileSystem) Flush(string, bool) error {
}
}
+func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+ return fs.rootnode().Child(name, replace)
+}
+
+func (fs *collectionFileSystem) FS() FileSystem {
+ return fs
+}
+
+func (fs *collectionFileSystem) FileInfo() os.FileInfo {
+ return fs.rootnode().FileInfo()
+}
+
+func (fs *collectionFileSystem) IsDir() bool {
+ return true
+}
+
+func (fs *collectionFileSystem) Lock() {
+ fs.rootnode().Lock()
+}
+
+func (fs *collectionFileSystem) Unlock() {
+ fs.rootnode().Unlock()
+}
+
+func (fs *collectionFileSystem) RLock() {
+ fs.rootnode().RLock()
+}
+
+func (fs *collectionFileSystem) RUnlock() {
+ fs.rootnode().RUnlock()
+}
+
+func (fs *collectionFileSystem) Parent() inode {
+ return fs.rootnode().Parent()
+}
+
+func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
+ return fs.rootnode().Readdir()
+}
+
+func (fs *collectionFileSystem) SetParent(parent inode, name string) {
+ fs.rootnode().SetParent(parent, name)
+}
+
+func (fs *collectionFileSystem) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
func (fs *collectionFileSystem) Sync() error {
if fs.uuid == "" {
return nil
import (
"bytes"
"crypto/md5"
- "crypto/sha1"
"errors"
"fmt"
"io"
blocks map[string][]byte
refreshable map[string]bool
onPut func(bufcopy []byte) // called from PutB, before acquiring lock
+ authToken string // client's auth token (used for signing locators)
+ sigkey string // blob signing key
+ sigttl time.Duration // blob signing ttl
sync.RWMutex
}
}
func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
- locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+ locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
buf := make([]byte, len(p))
copy(buf, p)
if kcs.onPut != nil {
return locator, 1, nil
}
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+ if strings.Contains(locator, "+A") {
+ return locator, nil
+ }
kcs.Lock()
defer kcs.Unlock()
if strings.Contains(locator, "+R") {
return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
}
}
- fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
- return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+ locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
+ locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ return locator, nil
}
type CollectionFSSuite struct {
s.kc = &keepClientStub{
blocks: map[string][]byte{
"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
- }}
+ },
+ sigkey: fixtureBlobSigningKey,
+ sigttl: fixtureBlobSigningTTL,
+ authToken: fixtureActiveToken,
+ }
s.fs, err = s.coll.FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
log.Printf("BUG: unhandled error: %s", err)
return placeholder
}
- cfs, err := coll.FileSystem(fs, fs)
+ newfs, err := coll.FileSystem(fs, fs)
if err != nil {
log.Printf("BUG: unhandled error: %s", err)
return placeholder
}
- root := cfs.rootnode()
- root.SetParent(parent, coll.Name)
- return root
+ cfs := newfs.(*collectionFileSystem)
+ cfs.SetParent(parent, coll.Name)
+ return cfs
}}
}
return dn.realinode().Child(name, replace)
}
+// Sync is a no-op if the real inode hasn't even been created yet.
+func (dn *deferrednode) Sync() error {
+ dn.mtx.Lock()
+ defer dn.mtx.Unlock()
+ if !dn.created {
+ return nil
+ } else if syncer, ok := dn.wrapped.(syncer); ok {
+ return syncer.Sync()
+ } else {
+ return ErrInvalidOperation
+ }
+}
+
func (dn *deferrednode) Truncate(size int64) error { return dn.realinode().Truncate(size) }
func (dn *deferrednode) SetParent(p inode, name string) { dn.realinode().SetParent(p, name) }
func (dn *deferrednode) IsDir() bool { return dn.currentinode().IsDir() }
//
// See (*customFileSystem)MountUsers for example usage.
type lookupnode struct {
- inode
+ treenode
loadOne func(parent inode, name string) (inode, error)
loadAll func(parent inode) ([]inode, error)
stale func(time.Time) bool
staleOne map[string]time.Time
}
+// Sync flushes pending writes for loaded children and, if successful,
+// triggers a reload on next lookup.
+func (ln *lookupnode) Sync() error {
+ err := ln.treenode.Sync()
+ if err != nil {
+ return err
+ }
+ ln.staleLock.Lock()
+ ln.staleAll = time.Time{}
+ ln.staleOne = nil
+ ln.staleLock.Unlock()
+ return nil
+}
+
func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
ln.staleLock.Lock()
defer ln.staleLock.Unlock()
return nil, err
}
for _, child := range all {
- _, err = ln.inode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
+ _, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
return child, nil
})
if err != nil {
// newer than ln.staleAll. Reclaim memory.
ln.staleOne = nil
}
- return ln.inode.Readdir()
+ return ln.treenode.Readdir()
}
+// Child rejects (with ErrInvalidArgument) calls to add/replace
+// children, instead calling loadOne when a non-existing child is
+// looked up.
func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
ln.staleLock.Lock()
defer ln.staleLock.Unlock()
checkTime := time.Now()
+ var existing inode
+ var err error
if ln.stale(ln.staleAll) && ln.stale(ln.staleOne[name]) {
- _, err := ln.inode.Child(name, func(inode) (inode, error) {
+ existing, err = ln.treenode.Child(name, func(inode) (inode, error) {
return ln.loadOne(ln, name)
})
- if err != nil {
- return nil, err
+ if err == nil && existing != nil {
+ if ln.staleOne == nil {
+ ln.staleOne = map[string]time.Time{name: checkTime}
+ } else {
+ ln.staleOne[name] = checkTime
+ }
}
- if ln.staleOne == nil {
- ln.staleOne = map[string]time.Time{name: checkTime}
- } else {
- ln.staleOne[name] = checkTime
+ } else {
+ existing, err = ln.treenode.Child(name, nil)
+ if err != nil && !os.IsNotExist(err) {
+ return existing, err
+ }
+ }
+ if replace != nil {
+ // Let the callback try to delete or replace the
+ // existing node; if it does, return
+ // ErrInvalidArgument.
+ if tryRepl, err := replace(existing); err != nil {
+ // Propagate error from callback
+ return existing, err
+ } else if tryRepl != existing {
+ return existing, ErrInvalidArgument
}
}
- return ln.inode.Child(name, replace)
+ // Return original error from ln.treenode.Child() (it might be
+ // ErrNotExist).
+ return existing, err
}
import (
"log"
- "os"
"strings"
)
// both "/" and the substitution string.
}
if len(contents.Items) == 0 {
- return nil, os.ErrNotExist
+ return nil, nil
}
coll := contents.Items[0]
err = wf.Close()
c.Check(err, check.IsNil)
+ err = project.Sync()
+ c.Check(err, check.IsNil)
+ _, err = s.fs.Open("/home/A Project/oob/test.txt")
+ c.Check(err, check.IsNil)
+
+ // Sync again to mark the project dir as stale, so the
+ // collection gets reloaded from the controller on next
+ // lookup.
+ err = project.Sync()
+ c.Check(err, check.IsNil)
+
+ // Ensure collection was flushed by Sync
+ var latest Collection
+ err = s.client.RequestAndDecode(&latest, "GET", "arvados/v1/collections/"+oob.UUID, nil, nil)
+ c.Check(latest.ManifestText, check.Matches, `.*:test.txt.*\n`)
+
// Delete test.txt behind s.fs's back by updating the
// collection record with an empty ManifestText.
err = s.client.RequestAndDecode(nil, "PATCH", "arvados/v1/collections/"+oob.UUID, nil, map[string]interface{}{
})
c.Assert(err, check.IsNil)
- err = project.Sync()
- c.Check(err, check.IsNil)
_, err = s.fs.Open("/home/A Project/oob/test.txt")
c.Check(err, check.NotNil)
_, err = s.fs.Open("/home/A Project/oob")
c.Assert(err, check.IsNil)
err = project.Sync()
- c.Check(err, check.IsNil)
+ c.Check(err, check.NotNil) // can't update the deleted collection
_, err = s.fs.Open("/home/A Project/oob")
- c.Check(err, check.NotNil)
+ c.Check(err, check.IsNil) // parent dir still has old collection -- didn't reload, because Sync failed
+}
+
+func (s *SiteFSSuite) TestProjectUnsupportedOperations(c *check.C) {
+ s.fs.MountByID("by_id")
+ s.fs.MountProject("home", "")
+
+ _, err := s.fs.OpenFile("/home/A Project/newfilename", os.O_CREATE|os.O_RDWR, 0)
+ c.Check(err, check.ErrorMatches, "invalid argument")
+
+ err = s.fs.Mkdir("/home/A Project/newdirname", 0)
+ c.Check(err, check.ErrorMatches, "invalid argument")
+
+ err = s.fs.Mkdir("/by_id/newdirname", 0)
+ c.Check(err, check.ErrorMatches, "invalid argument")
+
+ err = s.fs.Mkdir("/by_id/"+fixtureAProjectUUID+"/newdirname", 0)
+ c.Check(err, check.ErrorMatches, "invalid argument")
+
+ _, err = s.fs.OpenFile("/home/A Project", 0, 0)
+ c.Check(err, check.IsNil)
}
thr: newThrottle(concurrentWriters),
},
}
- root.inode = &treenode{
+ root.treenode = treenode{
fs: fs,
parent: root,
fileinfo: fileinfo{
}
func (fs *customFileSystem) MountByID(mount string) {
- fs.root.inode.Child(mount, func(inode) (inode, error) {
+ fs.root.treenode.Child(mount, func(inode) (inode, error) {
return &vdirnode{
- inode: &treenode{
+ treenode: treenode{
fs: fs,
parent: fs.root,
inodes: make(map[string]inode),
}
func (fs *customFileSystem) MountProject(mount, uuid string) {
- fs.root.inode.Child(mount, func(inode) (inode, error) {
+ fs.root.treenode.Child(mount, func(inode) (inode, error) {
return fs.newProjectNode(fs.root, mount, uuid), nil
})
}
func (fs *customFileSystem) MountUsers(mount string) {
- fs.root.inode.Child(mount, func(inode) (inode, error) {
+ fs.root.treenode.Child(mount, func(inode) (inode, error) {
return &lookupnode{
stale: fs.Stale,
loadOne: fs.usersLoadOne,
loadAll: fs.usersLoadAll,
- inode: &treenode{
+ treenode: treenode{
fs: fs,
parent: fs.root,
inodes: make(map[string]inode),
}
func (fs *customFileSystem) Sync() error {
- fs.staleLock.Lock()
- defer fs.staleLock.Unlock()
- fs.staleThreshold = time.Now()
- return nil
+ return fs.root.Sync()
}
// Stale returns true if information obtained at time t should be
}
func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
- return nil, ErrInvalidOperation
+ return nil, ErrInvalidArgument
}
func (fs *customFileSystem) mountByID(parent inode, id string) inode {
if err != nil {
return nil
}
- cfs, err := coll.FileSystem(fs, fs)
+ newfs, err := coll.FileSystem(fs, fs)
if err != nil {
return nil
}
- root := cfs.rootnode()
- root.SetParent(parent, id)
- return root
+ cfs := newfs.(*collectionFileSystem)
+ cfs.SetParent(parent, id)
+ return cfs
}
func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
stale: fs.Stale,
loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
- inode: &treenode{
+ treenode: treenode{
fs: fs,
parent: root,
inodes: make(map[string]inode),
}
}
-// vdirnode wraps an inode by ignoring any requests to add/replace
-// children, and calling a create() func when a non-existing child is
-// looked up.
+// vdirnode wraps an inode by rejecting (with ErrInvalidArgument)
+// calls that add/replace children directly, instead calling a
+// create() func when a non-existing child is looked up.
//
// create() can return either a new node, which will be added to the
// treenode, or nil for ENOENT.
type vdirnode struct {
- inode
+ treenode
create func(parent inode, name string) inode
}
func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
- return vn.inode.Child(name, func(existing inode) (inode, error) {
+ return vn.treenode.Child(name, func(existing inode) (inode, error) {
if existing == nil && vn.create != nil {
existing = vn.create(vn, name)
if existing != nil {
existing.SetParent(vn, name)
- vn.inode.(*treenode).fileinfo.modTime = time.Now()
+ vn.treenode.fileinfo.modTime = time.Now()
}
}
if replace == nil {
import (
"net/http"
"os"
+ "time"
check "gopkg.in/check.v1"
)
fixtureFooCollectionPDH = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
fixtureFooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
fixtureNonexistentCollection = "zzzzz-4zz18-totallynotexist"
+ fixtureBlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+ fixtureBlobSigningTTL = 336 * time.Hour
)
var _ = check.Suite(&SiteFSSuite{})
s.kc = &keepClientStub{
blocks: map[string][]byte{
"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
- }}
+ },
+ sigkey: fixtureBlobSigningKey,
+ sigttl: fixtureBlobSigningTTL,
+ authToken: fixtureActiveToken,
+ }
s.fs = s.client.SiteFileSystem(s.kc)
}
c.Check(names, check.DeepEquals, []string{"baz"})
_, err = s.fs.OpenFile("/by_id/"+fixtureNonexistentCollection, os.O_RDWR|os.O_CREATE, 0755)
- c.Check(err, check.Equals, ErrInvalidOperation)
+ c.Check(err, check.Equals, ErrInvalidArgument)
err = s.fs.Rename("/by_id/"+fixtureFooCollection, "/by_id/beep")
c.Check(err, check.Equals, ErrInvalidArgument)
err = s.fs.Rename("/by_id/"+fixtureFooCollection+"/foo", "/by_id/beep")
w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
}
+ if h.serveS3(w, r) {
+ return
+ }
+
pathParts := strings.Split(r.URL.Path[1:], "/")
var stripParts int
}
}
+func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
+ arv = h.clientPool.Get()
+ if arv == nil {
+ return nil, nil, nil, nil, err
+ }
+ release = func() { h.clientPool.Put(arv) }
+ arv.ApiToken = token
+ kc, err = keepclient.MakeKeepClient(arv)
+ if err != nil {
+ release()
+ return
+ }
+ kc.RequestID = reqID
+ client = (&arvados.Client{
+ APIHost: arv.ApiServer,
+ AuthToken: arv.ApiToken,
+ Insecure: arv.ApiInsecure,
+ }).WithRequestID(reqID)
+ return
+}
+
func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
if len(tokens) == 0 {
w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
- arv := h.clientPool.Get()
- if arv == nil {
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+ if err != nil {
http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
return
}
- defer h.clientPool.Put(arv)
- arv.ApiToken = tokens[0]
+ defer release()
- kc, err := keepclient.MakeKeepClient(arv)
- if err != nil {
- http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
- return
- }
- kc.RequestID = r.Header.Get("X-Request-Id")
- client := (&arvados.Client{
- APIHost: arv.ApiServer,
- AuthToken: arv.ApiToken,
- Insecure: arv.ApiInsecure,
- }).WithRequestID(r.Header.Get("X-Request-Id"))
fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
f, err := fs.Open(r.URL.Path)
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
os.Setenv("ARVADOS_API_HOST", cfg.cluster.Services.Controller.ExternalURL.Host)
srv := &server{Config: cfg}
- if err := srv.Start(); err != nil {
+ if err := srv.Start(logrus.StandardLogger()); err != nil {
log.Fatal(err)
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/AdRoll/goamz/s3"
+)
+
+const s3MaxKeys = 1000
+
+// serveS3 handles r and returns true if r is a request from an S3
+// client, otherwise it returns false.
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
+ var token string
+ if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+ split := strings.SplitN(auth[4:], ":", 2)
+ if len(split) < 2 {
+ w.WriteHeader(http.StatusUnauthorized)
+ return true
+ }
+ token = split[0]
+ } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+ for _, cmpt := range strings.Split(auth[17:], ",") {
+ cmpt = strings.TrimSpace(cmpt)
+ split := strings.SplitN(cmpt, "=", 2)
+ if len(split) == 2 && split[0] == "Credential" {
+ keyandscope := strings.Split(split[1], "/")
+ if len(keyandscope[0]) > 0 {
+ token = keyandscope[0]
+ break
+ }
+ }
+ }
+ if token == "" {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Println(w, "invalid V4 signature")
+ return true
+ }
+ } else {
+ return false
+ }
+
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+ if err != nil {
+ http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+ return true
+ }
+ defer release()
+
+ fs := client.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+ objectNameGiven := strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") > 1
+
+ switch {
+ case r.Method == http.MethodGet && !objectNameGiven:
+ // Path is "/{uuid}" or "/{uuid}/", has no object name
+ if _, ok := r.URL.Query()["versioning"]; ok {
+ // GetBucketVersioning
+ w.Header().Set("Content-Type", "application/xml")
+ io.WriteString(w, xml.Header)
+ fmt.Fprintln(w, `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`)
+ } else {
+ // ListObjects
+ h.s3list(w, r, fs)
+ }
+ return true
+ case r.Method == http.MethodGet || r.Method == http.MethodHead:
+ fspath := "/by_id" + r.URL.Path
+ fi, err := fs.Stat(fspath)
+ if r.Method == "HEAD" && !objectNameGiven {
+ // HeadBucket
+ if err == nil && fi.IsDir() {
+ w.WriteHeader(http.StatusOK)
+ } else if os.IsNotExist(err) {
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ }
+ return true
+ }
+ if err == nil && fi.IsDir() && objectNameGiven && strings.HasSuffix(fspath, "/") && h.Config.cluster.Collections.S3FolderObjects {
+ w.Header().Set("Content-Type", "application/x-directory")
+ w.WriteHeader(http.StatusOK)
+ return true
+ }
+ if os.IsNotExist(err) ||
+ (err != nil && err.Error() == "not a directory") ||
+ (fi != nil && fi.IsDir()) {
+ http.Error(w, "not found", http.StatusNotFound)
+ return true
+ }
+ // shallow copy r, and change URL path
+ r := *r
+ r.URL.Path = fspath
+ http.FileServer(fs).ServeHTTP(w, &r)
+ return true
+ case r.Method == http.MethodPut:
+ if !objectNameGiven {
+ http.Error(w, "missing object name in PUT request", http.StatusBadRequest)
+ return true
+ }
+ fspath := "by_id" + r.URL.Path
+ var objectIsDir bool
+ if strings.HasSuffix(fspath, "/") {
+ if !h.Config.cluster.Collections.S3FolderObjects {
+ http.Error(w, "invalid object name: trailing slash", http.StatusBadRequest)
+ return true
+ }
+ n, err := r.Body.Read(make([]byte, 1))
+ if err != nil && err != io.EOF {
+ http.Error(w, fmt.Sprintf("error reading request body: %s", err), http.StatusInternalServerError)
+ return true
+ } else if n > 0 {
+ http.Error(w, "cannot create object with trailing '/' char unless content is empty", http.StatusBadRequest)
+ return true
+ } else if strings.SplitN(r.Header.Get("Content-Type"), ";", 2)[0] != "application/x-directory" {
+ http.Error(w, "cannot create object with trailing '/' char unless Content-Type is 'application/x-directory'", http.StatusBadRequest)
+ return true
+ }
+ // Given PUT "foo/bar/", we'll use "foo/bar/."
+ // in the "ensure parents exist" block below,
+ // and then we'll be done.
+ fspath += "."
+ objectIsDir = true
+ }
+ fi, err := fs.Stat(fspath)
+ if err != nil && err.Error() == "not a directory" {
+ // requested foo/bar, but foo is a file
+ http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
+ return true
+ }
+ if strings.HasSuffix(r.URL.Path, "/") && err == nil && !fi.IsDir() {
+ // requested foo/bar/, but foo/bar is a file
+ http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
+ return true
+ }
+ // create missing parent/intermediate directories, if any
+ for i, c := range fspath {
+ if i > 0 && c == '/' {
+ dir := fspath[:i]
+ if strings.HasSuffix(dir, "/") {
+ err = errors.New("invalid object name (consecutive '/' chars)")
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+ err = fs.Mkdir(dir, 0755)
+ if err == arvados.ErrInvalidArgument {
+ // Cannot create a directory
+ // here.
+ err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ } else if err != nil && !os.IsExist(err) {
+ err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return true
+ }
+ }
+ }
+ if !objectIsDir {
+ f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ if os.IsNotExist(err) {
+ f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ }
+ if err != nil {
+ err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+ defer f.Close()
+ _, err = io.Copy(f, r.Body)
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return true
+ }
+ err = f.Close()
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: close: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return true
+ }
+ }
+ err = fs.Sync()
+ if err != nil {
+ err = fmt.Errorf("sync failed: %w", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return true
+ }
+ w.WriteHeader(http.StatusOK)
+ return true
+ default:
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return true
+ }
+}
+
+// Call fn on the given path (directory) and its contents, in
+// lexicographic order.
+//
+// If isRoot==true and path is not a directory, return nil.
+//
+// If fn returns filepath.SkipDir when called on a directory, don't
+// descend into that directory.
+func walkFS(fs arvados.CustomFileSystem, path string, isRoot bool, fn func(path string, fi os.FileInfo) error) error {
+ if isRoot {
+ fi, err := fs.Stat(path)
+ if os.IsNotExist(err) || (err == nil && !fi.IsDir()) {
+ return nil
+ } else if err != nil {
+ return err
+ }
+ err = fn(path, fi)
+ if err == filepath.SkipDir {
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+ f, err := fs.Open(path)
+ if os.IsNotExist(err) && isRoot {
+ return nil
+ } else if err != nil {
+ return fmt.Errorf("open %q: %w", path, err)
+ }
+ defer f.Close()
+ if path == "/" {
+ path = ""
+ }
+ fis, err := f.Readdir(-1)
+ if err != nil {
+ return err
+ }
+ sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
+ for _, fi := range fis {
+ err = fn(path+"/"+fi.Name(), fi)
+ if err == filepath.SkipDir {
+ continue
+ } else if err != nil {
+ return err
+ }
+ if fi.IsDir() {
+ err = walkFS(fs, path+"/"+fi.Name(), false, fn)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+var errDone = errors.New("done")
+
+func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) {
+ var params struct {
+ bucket string
+ delimiter string
+ marker string
+ maxKeys int
+ prefix string
+ }
+ params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0]
+ params.delimiter = r.FormValue("delimiter")
+ params.marker = r.FormValue("marker")
+ if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 && mk < s3MaxKeys {
+ params.maxKeys = int(mk)
+ } else {
+ params.maxKeys = s3MaxKeys
+ }
+ params.prefix = r.FormValue("prefix")
+
+ bucketdir := "by_id/" + params.bucket
+ // walkpath is the directory (relative to bucketdir) we need
+ // to walk: the innermost directory that is guaranteed to
+ // contain all paths that have the requested prefix. Examples:
+ // prefix "foo/bar" => walkpath "foo"
+ // prefix "foo/bar/" => walkpath "foo/bar"
+ // prefix "foo" => walkpath ""
+ // prefix "" => walkpath ""
+ walkpath := params.prefix
+ if cut := strings.LastIndex(walkpath, "/"); cut >= 0 {
+ walkpath = walkpath[:cut]
+ } else {
+ walkpath = ""
+ }
+
+ resp := s3.ListResp{
+ Name: strings.SplitN(r.URL.Path[1:], "/", 2)[0],
+ Prefix: params.prefix,
+ Delimiter: params.delimiter,
+ Marker: params.marker,
+ MaxKeys: params.maxKeys,
+ }
+ commonPrefixes := map[string]bool{}
+ err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
+ if path == bucketdir {
+ return nil
+ }
+ path = path[len(bucketdir)+1:]
+ filesize := fi.Size()
+ if fi.IsDir() {
+ path += "/"
+ filesize = 0
+ }
+ if len(path) <= len(params.prefix) {
+ if path > params.prefix[:len(path)] {
+ // with prefix "foobar", walking "fooz" means we're done
+ return errDone
+ }
+ if path < params.prefix[:len(path)] {
+ // with prefix "foobar", walking "foobag" is pointless
+ return filepath.SkipDir
+ }
+ if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) {
+ // with prefix "foo/bar", walking "fo"
+ // is pointless (but walking "foo" or
+ // "foo/bar" is necessary)
+ return filepath.SkipDir
+ }
+ if len(path) < len(params.prefix) {
+ // can't skip anything, and this entry
+ // isn't in the results, so just
+ // continue descent
+ return nil
+ }
+ } else {
+ if path[:len(params.prefix)] > params.prefix {
+ // with prefix "foobar", nothing we
+ // see after "foozzz" is relevant
+ return errDone
+ }
+ }
+ if path < params.marker || path < params.prefix {
+ return nil
+ }
+ if fi.IsDir() && !h.Config.cluster.Collections.S3FolderObjects {
+ // Note we don't add anything to
+ // commonPrefixes here even if delimiter is
+ // "/". We descend into the directory, and
+ // return a commonPrefix only if we end up
+ // finding a regular file inside it.
+ return nil
+ }
+ if params.delimiter != "" {
+ idx := strings.Index(path[len(params.prefix):], params.delimiter)
+ if idx >= 0 {
+ // with prefix "foobar" and delimiter
+ // "z", when we hit "foobar/baz", we
+ // add "/baz" to commonPrefixes and
+ // stop descending.
+ commonPrefixes[path[:len(params.prefix)+idx+1]] = true
+ return filepath.SkipDir
+ }
+ }
+ if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
+ resp.IsTruncated = true
+ if params.delimiter != "" {
+ resp.NextMarker = path
+ }
+ return errDone
+ }
+ resp.Contents = append(resp.Contents, s3.Key{
+ Key: path,
+ LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
+ Size: filesize,
+ })
+ return nil
+ })
+ if err != nil && err != errDone {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ if params.delimiter != "" {
+ for prefix := range commonPrefixes {
+ resp.CommonPrefixes = append(resp.CommonPrefixes, prefix)
+ sort.Strings(resp.CommonPrefixes)
+ }
+ }
+ wrappedResp := struct {
+ XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
+ s3.ListResp
+ }{"", resp}
+ w.Header().Set("Content-Type", "application/xml")
+ io.WriteString(w, xml.Header)
+ if err := xml.NewEncoder(w).Encode(wrappedResp); err != nil {
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "crypto/rand"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/AdRoll/goamz/aws"
+ "github.com/AdRoll/goamz/s3"
+ check "gopkg.in/check.v1"
+)
+
+type s3stage struct {
+ arv *arvados.Client
+ ac *arvadosclient.ArvadosClient
+ kc *keepclient.KeepClient
+ proj arvados.Group
+ projbucket *s3.Bucket
+ coll arvados.Collection
+ collbucket *s3.Bucket
+}
+
+func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
+ var proj arvados.Group
+ var coll arvados.Collection
+ arv := arvados.NewClientFromEnv()
+ arv.AuthToken = arvadostest.ActiveToken
+ err := arv.RequestAndDecode(&proj, "POST", "arvados/v1/groups", nil, map[string]interface{}{
+ "group": map[string]interface{}{
+ "group_class": "project",
+ "name": "keep-web s3 test",
+ },
+ "ensure_unique_name": true,
+ })
+ c.Assert(err, check.IsNil)
+ err = arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+ "owner_uuid": proj.UUID,
+ "name": "keep-web s3 test collection",
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+ }})
+ c.Assert(err, check.IsNil)
+ ac, err := arvadosclient.New(arv)
+ c.Assert(err, check.IsNil)
+ kc, err := keepclient.MakeKeepClient(ac)
+ c.Assert(err, check.IsNil)
+ fs, err := coll.FileSystem(arv, kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("sailboat.txt", os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("⛵\n"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ err = fs.Sync()
+ c.Assert(err, check.IsNil)
+ err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(err, check.IsNil)
+
+ auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
+ region := aws.Region{
+ Name: s.testServer.Addr,
+ S3Endpoint: "http://" + s.testServer.Addr,
+ }
+ client := s3.New(*auth, region)
+ return s3stage{
+ arv: arv,
+ ac: ac,
+ kc: kc,
+ proj: proj,
+ projbucket: &s3.Bucket{
+ S3: client,
+ Name: proj.UUID,
+ },
+ coll: coll,
+ collbucket: &s3.Bucket{
+ S3: client,
+ Name: coll.UUID,
+ },
+ }
+}
+
+func (stage s3stage) teardown(c *check.C) {
+ if stage.coll.UUID != "" {
+ err := stage.arv.RequestAndDecode(&stage.coll, "DELETE", "arvados/v1/collections/"+stage.coll.UUID, nil, nil)
+ c.Check(err, check.IsNil)
+ }
+ if stage.proj.UUID != "" {
+ err := stage.arv.RequestAndDecode(&stage.proj, "DELETE", "arvados/v1/groups/"+stage.proj.UUID, nil, nil)
+ c.Check(err, check.IsNil)
+ }
+}
+
+func (s *IntegrationSuite) TestS3HeadBucket(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+ c.Logf("bucket %s", bucket.Name)
+ exists, err := bucket.Exists("")
+ c.Check(err, check.IsNil)
+ c.Check(exists, check.Equals, true)
+ }
+}
+
+func (s *IntegrationSuite) TestS3CollectionGetObject(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3GetObject(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectGetObject(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3GetObject(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix string) {
+ rdr, err := bucket.GetReader(prefix + "emptyfile")
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf), check.Equals, 0)
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
+
+ // GetObject
+ rdr, err = bucket.GetReader(prefix + "missingfile")
+ c.Check(err, check.ErrorMatches, `404 Not Found`)
+
+ // HeadObject
+ exists, err := bucket.Exists(prefix + "missingfile")
+ c.Check(err, check.IsNil)
+ c.Check(exists, check.Equals, false)
+
+ // GetObject
+ rdr, err = bucket.GetReader(prefix + "sailboat.txt")
+ c.Assert(err, check.IsNil)
+ buf, err = ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, []byte("⛵\n"))
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
+
+ // HeadObject
+ exists, err = bucket.Exists(prefix + "sailboat.txt")
+ c.Check(err, check.IsNil)
+ c.Check(exists, check.Equals, true)
+}
+
+func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectSuccess(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
+ for _, trial := range []struct {
+ path string
+ size int
+ contentType string
+ }{
+ {
+ path: "newfile",
+ size: 128000000,
+ contentType: "application/octet-stream",
+ }, {
+ path: "newdir/newfile",
+ size: 1 << 26,
+ contentType: "application/octet-stream",
+ }, {
+ path: "newdir1/newdir2/newfile",
+ size: 0,
+ contentType: "application/octet-stream",
+ }, {
+ path: "newdir1/newdir2/newdir3/",
+ size: 0,
+ contentType: "application/x-directory",
+ },
+ } {
+ c.Logf("=== %v", trial)
+
+ objname := prefix + trial.path
+
+ _, err := bucket.GetReader(objname)
+ c.Assert(err, check.ErrorMatches, `404 Not Found`)
+
+ buf := make([]byte, trial.size)
+ rand.Read(buf)
+
+ err = bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), trial.contentType, s3.Private, s3.Options{})
+ c.Check(err, check.IsNil)
+
+ rdr, err := bucket.GetReader(objname)
+ if strings.HasSuffix(trial.path, "/") && !s.testServer.Config.cluster.Collections.S3FolderObjects {
+ c.Check(err, check.NotNil)
+ continue
+ } else if !c.Check(err, check.IsNil) {
+ continue
+ }
+ buf2, err := ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(buf2, check.HasLen, len(buf))
+ c.Check(bytes.Equal(buf, buf2), check.Equals, true)
+ }
+}
+
+func (s *IntegrationSuite) TestS3ProjectPutObjectNotSupported(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ bucket := stage.projbucket
+
+ for _, trial := range []struct {
+ path string
+ size int
+ contentType string
+ }{
+ {
+ path: "newfile",
+ size: 1234,
+ contentType: "application/octet-stream",
+ }, {
+ path: "newdir/newfile",
+ size: 1234,
+ contentType: "application/octet-stream",
+ }, {
+ path: "newdir2/",
+ size: 0,
+ contentType: "application/x-directory",
+ },
+ } {
+ c.Logf("=== %v", trial)
+
+ _, err := bucket.GetReader(trial.path)
+ c.Assert(err, check.ErrorMatches, `404 Not Found`)
+
+ buf := make([]byte, trial.size)
+ rand.Read(buf)
+
+ err = bucket.PutReader(trial.path, bytes.NewReader(buf), int64(len(buf)), trial.contentType, s3.Private, s3.Options{})
+ c.Check(err, check.ErrorMatches, `400 Bad Request`)
+
+ _, err = bucket.GetReader(trial.path)
+ c.Assert(err, check.ErrorMatches, `404 Not Found`)
+ }
+}
+
+func (s *IntegrationSuite) TestS3CollectionPutObjectFailure(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectFailure(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
+ s.testServer.Config.cluster.Collections.S3FolderObjects = false
+ var wg sync.WaitGroup
+ for _, trial := range []struct {
+ path string
+ }{
+ {
+ path: "emptyfile/newname", // emptyfile exists, see s3setup()
+ }, {
+ path: "emptyfile/", // emptyfile exists, see s3setup()
+ }, {
+ path: "emptydir", // dir already exists, see s3setup()
+ }, {
+ path: "emptydir/",
+ }, {
+ path: "emptydir//",
+ }, {
+ path: "newdir/",
+ }, {
+ path: "newdir//",
+ }, {
+ path: "/",
+ }, {
+ path: "//",
+ }, {
+ path: "foo//bar",
+ }, {
+ path: "",
+ },
+ } {
+ trial := trial
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ c.Logf("=== %v", trial)
+
+ objname := prefix + trial.path
+
+ buf := make([]byte, 1234)
+ rand.Read(buf)
+
+ err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ if !c.Check(err, check.ErrorMatches, `400 Bad.*`, check.Commentf("PUT %q should fail", objname)) {
+ return
+ }
+
+ if objname != "" && objname != "/" {
+ _, err = bucket.GetReader(objname)
+ c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
+ fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
+ c.Assert(err, check.IsNil)
+ for d := 0; d < dirs; d++ {
+ dir := fmt.Sprintf("dir%d", d)
+ c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
+ for i := 0; i < filesPerDir; i++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+ }
+ }
+ c.Assert(fs.Sync(), check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+ req, err := http.NewRequest("GET", bucket.URL("/"), nil)
+ req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+ req.URL.RawQuery = "versioning"
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"/>\n")
+ }
+}
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ var markers int
+ for markers, s.testServer.Config.cluster.Collections.S3FolderObjects = range []bool{false, true} {
+ dirs := 2
+ filesPerDir := 1001
+ stage.writeBigDirs(c, dirs, filesPerDir)
+ // Total # objects is:
+ // 2 file entries from s3setup (emptyfile and sailboat.txt)
+ // +1 fake "directory" marker from s3setup (emptydir) (if enabled)
+ // +dirs fake "directory" marker from writeBigDirs (dir0/, dir1/) (if enabled)
+ // +filesPerDir*dirs file entries from writeBigDirs (dir0/file0.txt, etc.)
+ s.testS3List(c, stage.collbucket, "", 4000, markers+2+(filesPerDir+markers)*dirs)
+ s.testS3List(c, stage.collbucket, "", 131, markers+2+(filesPerDir+markers)*dirs)
+ s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir+markers)
+ }
+}
+func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+ c.Logf("testS3List: prefix=%q pageSize=%d S3FolderObjects=%v", prefix, pageSize, s.testServer.Config.cluster.Collections.S3FolderObjects)
+ expectPageSize := pageSize
+ if expectPageSize > 1000 {
+ expectPageSize = 1000
+ }
+ gotKeys := map[string]s3.Key{}
+ nextMarker := ""
+ pages := 0
+ for {
+ resp, err := bucket.List(prefix, "", nextMarker, pageSize)
+ if !c.Check(err, check.IsNil) {
+ break
+ }
+ c.Check(len(resp.Contents) <= expectPageSize, check.Equals, true)
+ if pages++; !c.Check(pages <= (expectFiles/expectPageSize)+1, check.Equals, true) {
+ break
+ }
+ for _, key := range resp.Contents {
+ gotKeys[key.Key] = key
+ if strings.Contains(key.Key, "sailboat.txt") {
+ c.Check(key.Size, check.Equals, int64(4))
+ }
+ }
+ if !resp.IsTruncated {
+ c.Check(resp.NextMarker, check.Equals, "")
+ break
+ }
+ if !c.Check(resp.NextMarker, check.Not(check.Equals), "") {
+ break
+ }
+ nextMarker = resp.NextMarker
+ }
+ c.Check(len(gotKeys), check.Equals, expectFiles)
+}
+
+func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
+ for _, s.testServer.Config.cluster.Collections.S3FolderObjects = range []bool{false, true} {
+ s.testS3CollectionListRollup(c)
+ }
+}
+
+func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ dirs := 2
+ filesPerDir := 500
+ stage.writeBigDirs(c, dirs, filesPerDir)
+ err := stage.collbucket.PutReader("dingbats", &bytes.Buffer{}, 0, "application/octet-stream", s3.Private, s3.Options{})
+ c.Assert(err, check.IsNil)
+ var allfiles []string
+ for marker := ""; ; {
+ resp, err := stage.collbucket.List("", "", marker, 20000)
+ c.Check(err, check.IsNil)
+ for _, key := range resp.Contents {
+ if len(allfiles) == 0 || allfiles[len(allfiles)-1] != key.Key {
+ allfiles = append(allfiles, key.Key)
+ }
+ }
+ marker = resp.NextMarker
+ if marker == "" {
+ break
+ }
+ }
+ markers := 0
+ if s.testServer.Config.cluster.Collections.S3FolderObjects {
+ markers = 1
+ }
+ c.Check(allfiles, check.HasLen, dirs*(filesPerDir+markers)+3+markers)
+
+ gotDirMarker := map[string]bool{}
+ for _, name := range allfiles {
+ isDirMarker := strings.HasSuffix(name, "/")
+ if markers == 0 {
+ c.Check(isDirMarker, check.Equals, false, check.Commentf("name %q", name))
+ } else if isDirMarker {
+ gotDirMarker[name] = true
+ } else if i := strings.LastIndex(name, "/"); i >= 0 {
+ c.Check(gotDirMarker[name[:i+1]], check.Equals, true, check.Commentf("name %q", name))
+ gotDirMarker[name[:i+1]] = true // skip redundant complaints about this dir marker
+ }
+ }
+
+ for _, trial := range []struct {
+ prefix string
+ delimiter string
+ marker string
+ }{
+ {"", "", ""},
+ {"di", "/", ""},
+ {"di", "r", ""},
+ {"di", "n", ""},
+ {"dir0", "/", ""},
+ {"dir0/", "/", ""},
+ {"dir0/f", "/", ""},
+ {"dir0", "", ""},
+ {"dir0/", "", ""},
+ {"dir0/f", "", ""},
+ {"dir0", "/", "dir0/file14.txt"}, // no commonprefixes
+ {"", "", "dir0/file14.txt"}, // middle page, skip walking dir1
+ {"", "", "dir1/file14.txt"}, // middle page, skip walking dir0
+ {"", "", "dir1/file498.txt"}, // last page of results
+ {"dir1/file", "", "dir1/file498.txt"}, // last page of results, with prefix
+ {"dir1/file", "/", "dir1/file498.txt"}, // last page of results, with prefix + delimiter
+ {"dir1", "Z", "dir1/file498.txt"}, // delimiter "Z" never appears
+ {"dir2", "/", ""}, // prefix "dir2" does not exist
+ {"", "/", ""},
+ } {
+ c.Logf("\n\n=== trial %+v markers=%d", trial, markers)
+
+ maxKeys := 20
+ resp, err := stage.collbucket.List(trial.prefix, trial.delimiter, trial.marker, maxKeys)
+ c.Check(err, check.IsNil)
+ if resp.IsTruncated && trial.delimiter == "" {
+ // goamz List method fills in the missing
+ // NextMarker field if resp.IsTruncated, so
+ // now we can't really tell whether it was
+ // sent by the server or by goamz. In cases
+ // where it should be empty but isn't, assume
+ // it's goamz's fault.
+ resp.NextMarker = ""
+ }
+
+ var expectKeys []string
+ var expectPrefixes []string
+ var expectNextMarker string
+ var expectTruncated bool
+ for _, key := range allfiles {
+ full := len(expectKeys)+len(expectPrefixes) >= maxKeys
+ if !strings.HasPrefix(key, trial.prefix) || key < trial.marker {
+ continue
+ } else if idx := strings.Index(key[len(trial.prefix):], trial.delimiter); trial.delimiter != "" && idx >= 0 {
+ prefix := key[:len(trial.prefix)+idx+1]
+ if len(expectPrefixes) > 0 && expectPrefixes[len(expectPrefixes)-1] == prefix {
+ // same prefix as previous key
+ } else if full {
+ expectNextMarker = key
+ expectTruncated = true
+ } else {
+ expectPrefixes = append(expectPrefixes, prefix)
+ }
+ } else if full {
+ if trial.delimiter != "" {
+ expectNextMarker = key
+ }
+ expectTruncated = true
+ break
+ } else {
+ expectKeys = append(expectKeys, key)
+ }
+ }
+
+ var gotKeys []string
+ for _, key := range resp.Contents {
+ gotKeys = append(gotKeys, key.Key)
+ }
+ var gotPrefixes []string
+ for _, prefix := range resp.CommonPrefixes {
+ gotPrefixes = append(gotPrefixes, prefix)
+ }
+ commentf := check.Commentf("trial %+v markers=%d", trial, markers)
+ c.Check(gotKeys, check.DeepEquals, expectKeys, commentf)
+ c.Check(gotPrefixes, check.DeepEquals, expectPrefixes, commentf)
+ c.Check(resp.NextMarker, check.Equals, expectNextMarker, commentf)
+ c.Check(resp.IsTruncated, check.Equals, expectTruncated, commentf)
+ c.Logf("=== trial %+v keys %q prefixes %q nextMarker %q", trial, gotKeys, gotPrefixes, resp.NextMarker)
+ }
+}
Config *Config
}
-func (srv *server) Start() error {
+func (srv *server) Start(logger *logrus.Logger) error {
h := &handler{Config: srv.Config}
reg := prometheus.NewRegistry()
h.Config.Cache.registry = reg
- ctx := ctxlog.Context(context.Background(), logrus.StandardLogger())
- mh := httpserver.Instrument(reg, nil, httpserver.HandlerWithContext(ctx, httpserver.AddRequestIDs(httpserver.LogRequests(h))))
+ ctx := ctxlog.Context(context.Background(), logger)
+ mh := httpserver.Instrument(reg, logger, httpserver.HandlerWithContext(ctx, httpserver.AddRequestIDs(httpserver.LogRequests(h))))
h.MetricsAPI = mh.ServeAPI(h.Config.cluster.ManagementToken, http.NotFoundHandler())
srv.Handler = mh
var listen arvados.URL
cfg.cluster.ManagementToken = arvadostest.ManagementToken
cfg.cluster.Users.AnonymousUserToken = arvadostest.AnonymousToken
s.testServer = &server{Config: cfg}
- err = s.testServer.Start()
+ err = s.testServer.Start(ctxlog.TestLogger(c))
c.Assert(err, check.Equals, nil)
}