From: Tom Clegg Date: Fri, 17 Jul 2020 20:06:18 +0000 (-0400) Subject: 16535: Add ListObjects API. X-Git-Tag: 2.1.0~129^2~22 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/d94095c1457273a49907d00d06a7d802ba979509 16535: Add ListObjects API. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go index 5043c65ec5..55d85c69ad 100644 --- a/services/keep-web/s3.go +++ b/services/keep-web/s3.go @@ -5,12 +5,20 @@ package main import ( + "encoding/xml" "errors" "fmt" "io" "net/http" "os" + "path/filepath" + "sort" + "strconv" "strings" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/tmp/GOPATH/src/github.com/AdRoll/goamz/s3" ) // serveS3 handles r and returns true if r is a request from an S3 @@ -39,38 +47,46 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } defer release() - r.URL.Path = "/by_id" + r.URL.Path - fs := client.SiteFileSystem(kc) fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution) - fi, err := fs.Stat(r.URL.Path) - switch r.Method { - case "GET": + switch { + case r.Method == "GET" && strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") == 1: + // Path is "/{uuid}" or "/{uuid}/", has no object name + h.s3list(w, r, fs) + return true + case r.Method == "GET": + fspath := "/by_id" + r.URL.Path + fi, err := fs.Stat(fspath) if os.IsNotExist(err) || (err != nil && err.Error() == "not a directory") || (fi != nil && fi.IsDir()) { http.Error(w, "not found", http.StatusNotFound) return true } - http.FileServer(fs).ServeHTTP(w, r) + // shallow copy r, and change URL path + r := *r + r.URL.Path = fspath + http.FileServer(fs).ServeHTTP(w, &r) return true - case "PUT": + case r.Method == "PUT": if strings.HasSuffix(r.URL.Path, "/") { http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest) return true } + fspath := "by_id" + r.URL.Path + _, err = fs.Stat(fspath) if err != nil && err.Error() == "not a directory" { // requested foo/bar, but foo is a file http.Error(w, "object name conflicts with existing object", http.StatusBadRequest) return true } - f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) if os.IsNotExist(err) { // create missing intermediate directories, then try again - for i, c := range r.URL.Path { + for i, c := range fspath { if i > 0 && c == '/' { - dir := r.URL.Path[:i] + dir := fspath[:i] if strings.HasSuffix(dir, "/") { err = errors.New("invalid object name (consecutive '/' chars)") http.Error(w, err.Error(), http.StatusBadRequest) @@ -84,7 +100,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } } } - f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) } if err != nil { err = fmt.Errorf("open %q failed: %w", r.URL.Path, err) @@ -117,3 +133,115 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } } + +func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error { + f, err := fs.Open(path) + if err != nil { + return fmt.Errorf("open %q: %w", path, err) + } + defer f.Close() + if path == "/" { + path = "" + } + fis, err := f.Readdir(-1) + if err != nil { + return err + } + sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() }) + for _, fi := range fis { + err = fn(path+"/"+fi.Name(), fi) + if err == filepath.SkipDir { + continue + } else if err != nil { + return err + } + if fi.IsDir() { + err = walkFS(fs, path+"/"+fi.Name(), fn) + if err != nil { + return err + } + } + } + return nil +} + +var errDone = errors.New("done") + +func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) { + var params struct { + bucket string + delimiter string + marker string + maxKeys int + prefix string + } + params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0] + params.delimiter = r.FormValue("delimiter") + params.marker = r.FormValue("marker") + if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 { + params.maxKeys = int(mk) + } else { + params.maxKeys = 100 + } + params.prefix = r.FormValue("prefix") + + bucketdir := "by_id/" + params.bucket + // walkpath is the directory (relative to bucketdir) we need + // to walk: the innermost directory that is guaranteed to + // contain all paths that have the requested prefix. Examples: + // prefix "foo/bar" => walkpath "foo" + // prefix "foo/bar/" => walkpath "foo/bar" + // prefix "foo" => walkpath "" + // prefix "" => walkpath "" + walkpath := params.prefix + if !strings.HasSuffix(walkpath, "/") { + walkpath, _ = filepath.Split(walkpath) + } + walkpath = strings.TrimSuffix(walkpath, "/") + + type commonPrefix struct { + Prefix string + } + type serverListResponse struct { + s3.ListResp + CommonPrefixes []commonPrefix + } + resp := serverListResponse{ListResp: s3.ListResp{ + Name: strings.SplitN(r.URL.Path[1:], "/", 2)[0], + Prefix: params.prefix, + Delimiter: params.delimiter, + Marker: params.marker, + MaxKeys: params.maxKeys, + }} + err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error { + path = path[len(bucketdir)+1:] + if !strings.HasPrefix(path, params.prefix) { + return filepath.SkipDir + } + if fi.IsDir() { + return nil + } + if path < params.marker { + return nil + } + // TODO: check delimiter, roll up common prefixes + if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys { + resp.IsTruncated = true + if params.delimiter == "" { + resp.NextMarker = path + } + return errDone + } + resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{ + Key: path, + }) + return nil + }) + if err != nil && err != errDone { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := xml.NewEncoder(w).Encode(resp); err != nil { + ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response") + } +} diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go index c4c216c8b9..5fab3607f6 100644 --- a/services/keep-web/s3_test.go +++ b/services/keep-web/s3_test.go @@ -7,6 +7,7 @@ package main import ( "bytes" "crypto/rand" + "fmt" "io/ioutil" "os" "sync" @@ -23,6 +24,8 @@ import ( type s3stage struct { arv *arvados.Client + ac *arvadosclient.ArvadosClient + kc *keepclient.KeepClient proj arvados.Group projbucket *s3.Bucket coll arvados.Collection @@ -62,6 +65,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage { c.Assert(err, check.IsNil) err = fs.Sync() c.Assert(err, check.IsNil) + err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil) + c.Assert(err, check.IsNil) auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour)) region := aws.Region{ @@ -71,6 +76,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage { client := s3.New(*auth, region) return s3stage{ arv: arv, + ac: ac, + kc: kc, proj: proj, projbucket: &s3.Bucket{ S3: client, @@ -227,9 +234,60 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, return } - _, err = bucket.GetReader(objname) - c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname)) + if objname != "" && objname != "/" { + _, err = bucket.GetReader(objname) + c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname)) + } }() } wg.Wait() } + +func (s *IntegrationSuite) TestS3CollectionList(c *check.C) { + stage := s.s3setup(c) + defer stage.teardown(c) + + filesPerDir := 1001 + + fs, err := stage.coll.FileSystem(stage.arv, stage.kc) + c.Assert(err, check.IsNil) + for _, dir := range []string{"dir1", "dir2"} { + c.Assert(fs.Mkdir(dir, 0755), check.IsNil) + for i := 0; i < filesPerDir; i++ { + f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644) + c.Assert(err, check.IsNil) + c.Assert(f.Close(), check.IsNil) + } + } + c.Assert(fs.Sync(), check.IsNil) + s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2) + s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2) + s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir) +} +func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) { + gotKeys := map[string]s3.Key{} + nextMarker := "" + pages := 0 + for { + resp, err := bucket.List(prefix, "", nextMarker, pageSize) + if !c.Check(err, check.IsNil) { + break + } + c.Check(len(resp.Contents) <= pageSize, check.Equals, true) + if pages++; !c.Check(pages <= (expectFiles/pageSize)+1, check.Equals, true) { + break + } + for _, key := range resp.Contents { + gotKeys[key.Key] = key + } + if !resp.IsTruncated { + c.Check(resp.NextMarker, check.Equals, "") + break + } + if !c.Check(resp.NextMarker, check.Not(check.Equals), "") { + break + } + nextMarker = resp.NextMarker + } + c.Check(len(gotKeys), check.Equals, expectFiles) +}