package main
import (
+ "encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"os"
+ "path/filepath"
+ "sort"
+ "strconv"
"strings"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/tmp/GOPATH/src/github.com/AdRoll/goamz/s3"
)
// serveS3 handles r and returns true if r is a request from an S3
}
defer release()
- r.URL.Path = "/by_id" + r.URL.Path
-
fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
- fi, err := fs.Stat(r.URL.Path)
- switch r.Method {
- case "GET":
+ switch {
+ case r.Method == "GET" && strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") == 1:
+ // Path is "/{uuid}" or "/{uuid}/", has no object name
+ h.s3list(w, r, fs)
+ return true
+ case r.Method == "GET":
+ fspath := "/by_id" + r.URL.Path
+ fi, err := fs.Stat(fspath)
if os.IsNotExist(err) ||
(err != nil && err.Error() == "not a directory") ||
(fi != nil && fi.IsDir()) {
http.Error(w, "not found", http.StatusNotFound)
return true
}
- http.FileServer(fs).ServeHTTP(w, r)
+ // shallow copy r, and change URL path
+ r := *r
+ r.URL.Path = fspath
+ http.FileServer(fs).ServeHTTP(w, &r)
return true
- case "PUT":
+ case r.Method == "PUT":
if strings.HasSuffix(r.URL.Path, "/") {
http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest)
return true
}
+ fspath := "by_id" + r.URL.Path
+ _, err = fs.Stat(fspath)
if err != nil && err.Error() == "not a directory" {
// requested foo/bar, but foo is a file
http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
return true
}
- f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
if os.IsNotExist(err) {
// create missing intermediate directories, then try again
- for i, c := range r.URL.Path {
+ for i, c := range fspath {
if i > 0 && c == '/' {
- dir := r.URL.Path[:i]
+ dir := fspath[:i]
if strings.HasSuffix(dir, "/") {
err = errors.New("invalid object name (consecutive '/' chars)")
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
}
- f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
}
if err != nil {
err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
return true
}
}
+
+func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error {
+ f, err := fs.Open(path)
+ if err != nil {
+ return fmt.Errorf("open %q: %w", path, err)
+ }
+ defer f.Close()
+ if path == "/" {
+ path = ""
+ }
+ fis, err := f.Readdir(-1)
+ if err != nil {
+ return err
+ }
+ sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
+ for _, fi := range fis {
+ err = fn(path+"/"+fi.Name(), fi)
+ if err == filepath.SkipDir {
+ continue
+ } else if err != nil {
+ return err
+ }
+ if fi.IsDir() {
+ err = walkFS(fs, path+"/"+fi.Name(), fn)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+var errDone = errors.New("done")
+
+func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) {
+ var params struct {
+ bucket string
+ delimiter string
+ marker string
+ maxKeys int
+ prefix string
+ }
+ params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0]
+ params.delimiter = r.FormValue("delimiter")
+ params.marker = r.FormValue("marker")
+ if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 {
+ params.maxKeys = int(mk)
+ } else {
+ params.maxKeys = 100
+ }
+ params.prefix = r.FormValue("prefix")
+
+ bucketdir := "by_id/" + params.bucket
+ // walkpath is the directory (relative to bucketdir) we need
+ // to walk: the innermost directory that is guaranteed to
+ // contain all paths that have the requested prefix. Examples:
+ // prefix "foo/bar" => walkpath "foo"
+ // prefix "foo/bar/" => walkpath "foo/bar"
+ // prefix "foo" => walkpath ""
+ // prefix "" => walkpath ""
+ walkpath := params.prefix
+ if !strings.HasSuffix(walkpath, "/") {
+ walkpath, _ = filepath.Split(walkpath)
+ }
+ walkpath = strings.TrimSuffix(walkpath, "/")
+
+ type commonPrefix struct {
+ Prefix string
+ }
+ type serverListResponse struct {
+ s3.ListResp
+ CommonPrefixes []commonPrefix
+ }
+ resp := serverListResponse{ListResp: s3.ListResp{
+ Name: strings.SplitN(r.URL.Path[1:], "/", 2)[0],
+ Prefix: params.prefix,
+ Delimiter: params.delimiter,
+ Marker: params.marker,
+ MaxKeys: params.maxKeys,
+ }}
+ err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error {
+ path = path[len(bucketdir)+1:]
+ if !strings.HasPrefix(path, params.prefix) {
+ return filepath.SkipDir
+ }
+ if fi.IsDir() {
+ return nil
+ }
+ if path < params.marker {
+ return nil
+ }
+ // TODO: check delimiter, roll up common prefixes
+ if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys {
+ resp.IsTruncated = true
+ if params.delimiter == "" {
+ resp.NextMarker = path
+ }
+ return errDone
+ }
+ resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{
+ Key: path,
+ })
+ return nil
+ })
+ if err != nil && err != errDone {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ if err := xml.NewEncoder(w).Encode(resp); err != nil {
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
+ }
+}
import (
"bytes"
"crypto/rand"
+ "fmt"
"io/ioutil"
"os"
"sync"
type s3stage struct {
arv *arvados.Client
+ ac *arvadosclient.ArvadosClient
+ kc *keepclient.KeepClient
proj arvados.Group
projbucket *s3.Bucket
coll arvados.Collection
c.Assert(err, check.IsNil)
err = fs.Sync()
c.Assert(err, check.IsNil)
+ err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(err, check.IsNil)
auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
region := aws.Region{
client := s3.New(*auth, region)
return s3stage{
arv: arv,
+ ac: ac,
+ kc: kc,
proj: proj,
projbucket: &s3.Bucket{
S3: client,
return
}
- _, err = bucket.GetReader(objname)
- c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+ if objname != "" && objname != "/" {
+ _, err = bucket.GetReader(objname)
+ c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+ }
}()
}
wg.Wait()
}
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ filesPerDir := 1001
+
+ fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
+ c.Assert(err, check.IsNil)
+ for _, dir := range []string{"dir1", "dir2"} {
+ c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
+ for i := 0; i < filesPerDir; i++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+ }
+ }
+ c.Assert(fs.Sync(), check.IsNil)
+ s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
+ s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
+ s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir)
+}
+func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+ gotKeys := map[string]s3.Key{}
+ nextMarker := ""
+ pages := 0
+ for {
+ resp, err := bucket.List(prefix, "", nextMarker, pageSize)
+ if !c.Check(err, check.IsNil) {
+ break
+ }
+ c.Check(len(resp.Contents) <= pageSize, check.Equals, true)
+ if pages++; !c.Check(pages <= (expectFiles/pageSize)+1, check.Equals, true) {
+ break
+ }
+ for _, key := range resp.Contents {
+ gotKeys[key.Key] = key
+ }
+ if !resp.IsTruncated {
+ c.Check(resp.NextMarker, check.Equals, "")
+ break
+ }
+ if !c.Check(resp.NextMarker, check.Not(check.Equals), "") {
+ break
+ }
+ nextMarker = resp.NextMarker
+ }
+ c.Check(len(gotKeys), check.Equals, expectFiles)
+}