20726: Fix traversing projects/collections that precede page marker. 20726-s3list-pages
authorTom Clegg <tom@curii.com>
Wed, 12 Jul 2023 04:19:08 +0000 (00:19 -0400)
committerTom Clegg <tom@curii.com>
Wed, 12 Jul 2023 04:19:08 +0000 (00:19 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-web/s3.go
services/keep-web/s3_test.go

index fd8764da8f61424b5ca8c5bbc5c7c97644238f89..8615ceaf08602822bea4d67d054e609eae2409c8 100644 (file)
@@ -790,6 +790,13 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                params.marker = r.FormValue("marker")
        }
 
+       // startAfter is params.marker or params.startAfter, whichever
+       // comes last.
+       startAfter := params.startAfter
+       if startAfter < params.marker {
+               startAfter = params.marker
+       }
+
        bucketdir := "by_id/" + bucket
        // walkpath is the directory (relative to bucketdir) we need
        // to walk: the innermost directory that is guaranteed to
@@ -821,6 +828,7 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
        nextMarker := ""
 
        commonPrefixes := map[string]bool{}
+       full := false
        err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
                if path == bucketdir {
                        return nil
@@ -831,36 +839,29 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        path += "/"
                        filesize = 0
                }
-               if len(path) <= len(params.prefix) {
-                       if path > params.prefix[:len(path)] {
-                               // with prefix "foobar", walking "fooz" means we're done
-                               return errDone
-                       }
-                       if path < params.prefix[:len(path)] {
-                               // with prefix "foobar", walking "foobag" is pointless
-                               return filepath.SkipDir
-                       }
-                       if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) {
-                               // with prefix "foo/bar", walking "fo"
-                               // is pointless (but walking "foo" or
-                               // "foo/bar" is necessary)
-                               return filepath.SkipDir
-                       }
-                       if len(path) < len(params.prefix) {
-                               // can't skip anything, and this entry
-                               // isn't in the results, so just
-                               // continue descent
-                               return nil
-                       }
-               } else {
-                       if path[:len(params.prefix)] > params.prefix {
-                               // with prefix "foobar", nothing we
-                               // see after "foozzz" is relevant
-                               return errDone
-                       }
-               }
-               if path <= params.marker || path < params.prefix || path <= params.startAfter {
+               if strings.HasPrefix(params.prefix, path) && params.prefix != path {
+                       // Descend into subtree until we reach desired prefix
                        return nil
+               } else if path < params.prefix {
+                       // Not an ancestor or descendant of desired
+                       // prefix, therefore none of its descendants
+                       // can be either -- skip
+                       return filepath.SkipDir
+               } else if path > params.prefix && !strings.HasPrefix(path, params.prefix) {
+                       // We must have traversed everything under
+                       // desired prefix
+                       return errDone
+               } else if path == startAfter {
+                       // Skip startAfter itself, just descend into
+                       // subtree
+                       return nil
+               } else if strings.HasPrefix(startAfter, path) {
+                       // Descend into subtree in case it contains
+                       // something after startAfter
+                       return nil
+               } else if path < startAfter {
+                       // Skip ahead until we reach startAfter
+                       return filepath.SkipDir
                }
                if fi.IsDir() && !h.Cluster.Collections.S3FolderObjects {
                        // Note we don't add anything to
@@ -870,10 +871,6 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        // finding a regular file inside it.
                        return nil
                }
-               if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
-                       resp.IsTruncated = true
-                       return errDone
-               }
                if params.delimiter != "" {
                        idx := strings.Index(path[len(params.prefix):], params.delimiter)
                        if idx >= 0 {
@@ -882,19 +879,32 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                                // add "/baz" to commonPrefixes and
                                // stop descending.
                                prefix := path[:len(params.prefix)+idx+1]
-                               if prefix != params.marker {
+                               if prefix == startAfter {
+                                       return nil
+                               } else if prefix < startAfter && !strings.HasPrefix(startAfter, prefix) {
+                                       return nil
+                               } else if full {
+                                       resp.IsTruncated = true
+                                       return errDone
+                               } else {
                                        commonPrefixes[prefix] = true
                                        nextMarker = prefix
+                                       full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
+                                       return filepath.SkipDir
                                }
-                               return filepath.SkipDir
                        }
                }
+               if full {
+                       resp.IsTruncated = true
+                       return errDone
+               }
                resp.Contents = append(resp.Contents, s3Key{
                        Key:          path,
                        LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
                        Size:         filesize,
                })
                nextMarker = path
+               full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
                return nil
        })
        if err != nil && err != errDone {
index 556f8ba2c2661b8c06902ed20de29456062ee0bc..9f8650b8ede8a831df79ada8588be10e979a966a 100644 (file)
@@ -943,7 +943,8 @@ func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
                {"dir0", "", ""},
                {"dir0/", "", ""},
                {"dir0/f", "", ""},
-               {"dir0", "/", "dir0/file14.txt"},       // no commonprefixes
+               {"dir0", "/", "dir0/file14.txt"},       // one commonprefix, "dir0/"
+               {"dir0", "/", "dir0/zzzzfile.txt"},     // no commonprefixes
                {"", "", "dir0/file14.txt"},            // middle page, skip walking dir1
                {"", "", "dir1/file14.txt"},            // middle page, skip walking dir0
                {"", "", "dir1/file498.txt"},           // last page of results
@@ -1017,6 +1018,61 @@ func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
        }
 }
 
+func (s *IntegrationSuite) TestS3ListObjectsV2ManySubprojects(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       projects := 50
+       collectionsPerProject := 2
+       for i := 0; i < projects; i++ {
+               var subproj arvados.Group
+               err := stage.arv.RequestAndDecode(&subproj, "POST", "arvados/v1/groups", nil, map[string]interface{}{
+                       "group": map[string]interface{}{
+                               "owner_uuid":  stage.subproj.UUID,
+                               "group_class": "project",
+                               "name":        fmt.Sprintf("keep-web s3 test subproject %d", i),
+                       },
+               })
+               c.Assert(err, check.IsNil)
+               for j := 0; j < collectionsPerProject; j++ {
+                       err = stage.arv.RequestAndDecode(nil, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+                               "owner_uuid":    subproj.UUID,
+                               "name":          fmt.Sprintf("keep-web s3 test collection %d", j),
+                               "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+                       }})
+                       c.Assert(err, check.IsNil)
+               }
+       }
+       c.Logf("setup complete")
+
+       sess := aws_session.Must(aws_session.NewSession(&aws_aws.Config{
+               Region:           aws_aws.String("auto"),
+               Endpoint:         aws_aws.String(s.testServer.URL),
+               Credentials:      aws_credentials.NewStaticCredentials(url.QueryEscape(arvadostest.ActiveTokenV2), url.QueryEscape(arvadostest.ActiveTokenV2), ""),
+               S3ForcePathStyle: aws_aws.Bool(true),
+       }))
+       client := aws_s3.New(sess)
+       ctx := context.Background()
+       params := aws_s3.ListObjectsV2Input{
+               Bucket:    aws_aws.String(stage.proj.UUID),
+               Delimiter: aws_aws.String("/"),
+               Prefix:    aws_aws.String("keep-web s3 test subproject/"),
+               MaxKeys:   aws_aws.Int64(int64(projects / 2)),
+       }
+       for page := 1; ; page++ {
+               t0 := time.Now()
+               result, err := client.ListObjectsV2WithContext(ctx, &params)
+               if !c.Check(err, check.IsNil) {
+                       break
+               }
+               c.Logf("got page %d in %v with len(Contents) == %d, len(CommonPrefixes) == %d", page, time.Since(t0), len(result.Contents), len(result.CommonPrefixes))
+               if !*result.IsTruncated {
+                       break
+               }
+               params.ContinuationToken = result.NextContinuationToken
+               *params.MaxKeys = *params.MaxKeys/2 + 1
+       }
+}
+
 func (s *IntegrationSuite) TestS3ListObjectsV2(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)