16535: Add ListObjects API.
authorTom Clegg <tom@tomclegg.ca>
Fri, 17 Jul 2020 20:06:18 +0000 (16:06 -0400)
committerTom Clegg <tom@tomclegg.ca>
Fri, 17 Jul 2020 20:06:18 +0000 (16:06 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

services/keep-web/s3.go
services/keep-web/s3_test.go

index 5043c65ec54e23c66e301b87f8f99ce7aeada255..55d85c69ad0eb92300e98898f71512859c0ddc80 100644 (file)
@@ -5,12 +5,20 @@
 package main
 
 import (
+       "encoding/xml"
        "errors"
        "fmt"
        "io"
        "net/http"
        "os"
+       "path/filepath"
+       "sort"
+       "strconv"
        "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/tmp/GOPATH/src/github.com/AdRoll/goamz/s3"
 )
 
 // serveS3 handles r and returns true if r is a request from an S3
@@ -39,38 +47,46 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
        }
        defer release()
 
-       r.URL.Path = "/by_id" + r.URL.Path
-
        fs := client.SiteFileSystem(kc)
        fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 
-       fi, err := fs.Stat(r.URL.Path)
-       switch r.Method {
-       case "GET":
+       switch {
+       case r.Method == "GET" && strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") == 1:
+               // Path is "/{uuid}" or "/{uuid}/", has no object name
+               h.s3list(w, r, fs)
+               return true
+       case r.Method == "GET":
+               fspath := "/by_id" + r.URL.Path
+               fi, err := fs.Stat(fspath)
                if os.IsNotExist(err) ||
                        (err != nil && err.Error() == "not a directory") ||
                        (fi != nil && fi.IsDir()) {
                        http.Error(w, "not found", http.StatusNotFound)
                        return true
                }
-               http.FileServer(fs).ServeHTTP(w, r)
+               // shallow copy r, and change URL path
+               r := *r
+               r.URL.Path = fspath
+               http.FileServer(fs).ServeHTTP(w, &r)
                return true
-       case "PUT":
+       case r.Method == "PUT":
                if strings.HasSuffix(r.URL.Path, "/") {
                        http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest)
                        return true
                }
+               fspath := "by_id" + r.URL.Path
+               _, err = fs.Stat(fspath)
                if err != nil && err.Error() == "not a directory" {
                        // requested foo/bar, but foo is a file
                        http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
                        return true
                }
-               f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+               f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
                if os.IsNotExist(err) {
                        // create missing intermediate directories, then try again
-                       for i, c := range r.URL.Path {
+                       for i, c := range fspath {
                                if i > 0 && c == '/' {
-                                       dir := r.URL.Path[:i]
+                                       dir := fspath[:i]
                                        if strings.HasSuffix(dir, "/") {
                                                err = errors.New("invalid object name (consecutive '/' chars)")
                                                http.Error(w, err.Error(), http.StatusBadRequest)
@@ -84,7 +100,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                                        }
                                }
                        }
-                       f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+                       f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
                }
                if err != nil {
                        err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
@@ -117,3 +133,115 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                return true
        }
 }
+
+func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error {
+       f, err := fs.Open(path)
+       if err != nil {
+               return fmt.Errorf("open %q: %w", path, err)
+       }
+       defer f.Close()
+       if path == "/" {
+               path = ""
+       }
+       fis, err := f.Readdir(-1)
+       if err != nil {
+               return err
+       }
+       sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
+       for _, fi := range fis {
+               err = fn(path+"/"+fi.Name(), fi)
+               if err == filepath.SkipDir {
+                       continue
+               } else if err != nil {
+                       return err
+               }
+               if fi.IsDir() {
+                       err = walkFS(fs, path+"/"+fi.Name(), fn)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+var errDone = errors.New("done")
+
+func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) {
+       var params struct {
+               bucket    string
+               delimiter string
+               marker    string
+               maxKeys   int
+               prefix    string
+       }
+       params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0]
+       params.delimiter = r.FormValue("delimiter")
+       params.marker = r.FormValue("marker")
+       if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 {
+               params.maxKeys = int(mk)
+       } else {
+               params.maxKeys = 100
+       }
+       params.prefix = r.FormValue("prefix")
+
+       bucketdir := "by_id/" + params.bucket
+       // walkpath is the directory (relative to bucketdir) we need
+       // to walk: the innermost directory that is guaranteed to
+       // contain all paths that have the requested prefix. Examples:
+       // prefix "foo/bar"  => walkpath "foo"
+       // prefix "foo/bar/" => walkpath "foo/bar"
+       // prefix "foo"      => walkpath ""
+       // prefix ""         => walkpath ""
+       walkpath := params.prefix
+       if !strings.HasSuffix(walkpath, "/") {
+               walkpath, _ = filepath.Split(walkpath)
+       }
+       walkpath = strings.TrimSuffix(walkpath, "/")
+
+       type commonPrefix struct {
+               Prefix string
+       }
+       type serverListResponse struct {
+               s3.ListResp
+               CommonPrefixes []commonPrefix
+       }
+       resp := serverListResponse{ListResp: s3.ListResp{
+               Name:      strings.SplitN(r.URL.Path[1:], "/", 2)[0],
+               Prefix:    params.prefix,
+               Delimiter: params.delimiter,
+               Marker:    params.marker,
+               MaxKeys:   params.maxKeys,
+       }}
+       err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error {
+               path = path[len(bucketdir)+1:]
+               if !strings.HasPrefix(path, params.prefix) {
+                       return filepath.SkipDir
+               }
+               if fi.IsDir() {
+                       return nil
+               }
+               if path < params.marker {
+                       return nil
+               }
+               // TODO: check delimiter, roll up common prefixes
+               if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys {
+                       resp.IsTruncated = true
+                       if params.delimiter == "" {
+                               resp.NextMarker = path
+                       }
+                       return errDone
+               }
+               resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{
+                       Key: path,
+               })
+               return nil
+       })
+       if err != nil && err != errDone {
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+       if err := xml.NewEncoder(w).Encode(resp); err != nil {
+               ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
+       }
+}
index c4c216c8b9c647aae6fc676a79b43f76b43e70e6..5fab3607f653e53058f400b8ef1c97880d7c413e 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "crypto/rand"
+       "fmt"
        "io/ioutil"
        "os"
        "sync"
@@ -23,6 +24,8 @@ import (
 
 type s3stage struct {
        arv        *arvados.Client
+       ac         *arvadosclient.ArvadosClient
+       kc         *keepclient.KeepClient
        proj       arvados.Group
        projbucket *s3.Bucket
        coll       arvados.Collection
@@ -62,6 +65,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
        c.Assert(err, check.IsNil)
        err = fs.Sync()
        c.Assert(err, check.IsNil)
+       err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(err, check.IsNil)
 
        auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
        region := aws.Region{
@@ -71,6 +76,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
        client := s3.New(*auth, region)
        return s3stage{
                arv:  arv,
+               ac:   ac,
+               kc:   kc,
                proj: proj,
                projbucket: &s3.Bucket{
                        S3:   client,
@@ -227,9 +234,60 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
                                return
                        }
 
-                       _, err = bucket.GetReader(objname)
-                       c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+                       if objname != "" && objname != "/" {
+                               _, err = bucket.GetReader(objname)
+                               c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+                       }
                }()
        }
        wg.Wait()
 }
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+
+       filesPerDir := 1001
+
+       fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
+       c.Assert(err, check.IsNil)
+       for _, dir := range []string{"dir1", "dir2"} {
+               c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
+               for i := 0; i < filesPerDir; i++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
+                       c.Assert(err, check.IsNil)
+                       c.Assert(f.Close(), check.IsNil)
+               }
+       }
+       c.Assert(fs.Sync(), check.IsNil)
+       s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
+       s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
+       s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir)
+}
+func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+       gotKeys := map[string]s3.Key{}
+       nextMarker := ""
+       pages := 0
+       for {
+               resp, err := bucket.List(prefix, "", nextMarker, pageSize)
+               if !c.Check(err, check.IsNil) {
+                       break
+               }
+               c.Check(len(resp.Contents) <= pageSize, check.Equals, true)
+               if pages++; !c.Check(pages <= (expectFiles/pageSize)+1, check.Equals, true) {
+                       break
+               }
+               for _, key := range resp.Contents {
+                       gotKeys[key.Key] = key
+               }
+               if !resp.IsTruncated {
+                       c.Check(resp.NextMarker, check.Equals, "")
+                       break
+               }
+               if !c.Check(resp.NextMarker, check.Not(check.Equals), "") {
+                       break
+               }
+               nextMarker = resp.NextMarker
+       }
+       c.Check(len(gotKeys), check.Equals, expectFiles)
+}