20726: Fix ListObjects[V2] pages duplicating last item on next page.
authorTom Clegg <tom@curii.com>
Tue, 11 Jul 2023 19:25:22 +0000 (15:25 -0400)
committerTom Clegg <tom@curii.com>
Tue, 11 Jul 2023 19:25:22 +0000 (15:25 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-web/s3.go
services/keep-web/s3_test.go

index 38428cdab1cb0a54775481ee9c45c977acab8af0..fd8764da8f61424b5ca8c5bbc5c7c97644238f89 100644 (file)
@@ -772,6 +772,9 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        http.Error(w, "invalid continuation token", http.StatusBadRequest)
                        return
                }
+               // marker and start-after perform the same function,
+               // but we keep them separate so we can repeat them
+               // back to the client in the response.
                params.marker = string(marker)
                params.startAfter = r.FormValue("start-after")
                switch r.FormValue("encoding-type") {
@@ -783,6 +786,7 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        return
                }
        } else {
+               // marker is functionally equivalent to start-after.
                params.marker = r.FormValue("marker")
        }
 
@@ -809,6 +813,11 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                ContinuationToken: r.FormValue("continuation-token"),
                StartAfter:        params.startAfter,
        }
+
+       // nextMarker will be the last path we add to either
+       // resp.Contents or commonPrefixes.  It will be included in
+       // the response as NextMarker or NextContinuationToken if
+       // needed.
        nextMarker := ""
 
        commonPrefixes := map[string]bool{}
@@ -850,7 +859,7 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                                return errDone
                        }
                }
-               if path < params.marker || path < params.prefix || path <= params.startAfter {
+               if path <= params.marker || path < params.prefix || path <= params.startAfter {
                        return nil
                }
                if fi.IsDir() && !h.Cluster.Collections.S3FolderObjects {
@@ -863,9 +872,6 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                }
                if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
                        resp.IsTruncated = true
-                       if params.delimiter != "" || params.v2 {
-                               nextMarker = path
-                       }
                        return errDone
                }
                if params.delimiter != "" {
@@ -875,7 +881,11 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                                // "z", when we hit "foobar/baz", we
                                // add "/baz" to commonPrefixes and
                                // stop descending.
-                               commonPrefixes[path[:len(params.prefix)+idx+1]] = true
+                               prefix := path[:len(params.prefix)+idx+1]
+                               if prefix != params.marker {
+                                       commonPrefixes[prefix] = true
+                                       nextMarker = prefix
+                               }
                                return filepath.SkipDir
                        }
                }
@@ -884,12 +894,16 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
                        Size:         filesize,
                })
+               nextMarker = path
                return nil
        })
        if err != nil && err != errDone {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
+       if params.delimiter == "" && !params.v2 || !resp.IsTruncated {
+               nextMarker = ""
+       }
        if params.delimiter != "" {
                resp.CommonPrefixes = make([]commonPrefix, 0, len(commonPrefixes))
                for prefix := range commonPrefixes {
index bfe8bb862ca89deac7015f4563adeb79c2d0f70b..556f8ba2c2661b8c06902ed20de29456062ee0bc 100644 (file)
@@ -851,6 +851,9 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
                        break
                }
                for _, key := range resp.Contents {
+                       if _, dup := gotKeys[key.Key]; dup {
+                               c.Errorf("got duplicate key %q on page %d", key.Key, pages)
+                       }
                        gotKeys[key.Key] = key
                        if strings.Contains(key.Key, "sailboat.txt") {
                                c.Check(key.Size, check.Equals, int64(4))
@@ -971,28 +974,31 @@ func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
                var expectTruncated bool
                for _, key := range allfiles {
                        full := len(expectKeys)+len(expectPrefixes) >= maxKeys
-                       if !strings.HasPrefix(key, trial.prefix) || key < trial.marker {
+                       if !strings.HasPrefix(key, trial.prefix) || key <= trial.marker {
                                continue
                        } else if idx := strings.Index(key[len(trial.prefix):], trial.delimiter); trial.delimiter != "" && idx >= 0 {
                                prefix := key[:len(trial.prefix)+idx+1]
                                if len(expectPrefixes) > 0 && expectPrefixes[len(expectPrefixes)-1] == prefix {
                                        // same prefix as previous key
                                } else if full {
-                                       expectNextMarker = key
                                        expectTruncated = true
                                } else {
                                        expectPrefixes = append(expectPrefixes, prefix)
+                                       expectNextMarker = prefix
                                }
                        } else if full {
-                               if trial.delimiter != "" {
-                                       expectNextMarker = key
-                               }
                                expectTruncated = true
                                break
                        } else {
                                expectKeys = append(expectKeys, key)
+                               if trial.delimiter != "" {
+                                       expectNextMarker = key
+                               }
                        }
                }
+               if !expectTruncated {
+                       expectNextMarker = ""
+               }
 
                var gotKeys []string
                for _, key := range resp.Contents {