20726: Fix ListObjects[V2] pages duplicating last item on next page.
[arvados.git] / services / keep-web / s3_test.go
index 14dfa62dba6c0272628f86930d59d33e44fe3b8c..556f8ba2c2661b8c06902ed20de29456062ee0bc 100644 (file)
@@ -17,6 +17,7 @@ import (
        "net/url"
        "os"
        "os/exec"
+       "sort"
        "strings"
        "sync"
        "time"
@@ -83,6 +84,10 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
                        "object":   map[string]interface{}{"key": map[string]interface{}{"key2": "value⛵"}},
                        "nonascii": "⛵",
                        "newline":  "foo\r\nX-Bad: header",
+                       // This key cannot be expressed as a MIME
+                       // header key, so it will be silently skipped
+                       // (see "Inject" in PropertiesAsMetadata test)
+                       "a: a\r\nInject": "bogus",
                },
        }})
        c.Assert(err, check.IsNil)
@@ -281,6 +286,7 @@ func (s *IntegrationSuite) TestS3PropertiesAsMetadata(c *check.C) {
        rdr.Close()
        c.Check(content, check.HasLen, 4)
        s.checkMetaEquals(c, hdr, expectCollectionTags)
+       c.Check(hdr["Inject"], check.IsNil)
 
        c.Log("HEAD bucket with metadata from collection")
        resp, err = stage.collbucket.Head("/", nil)
@@ -311,14 +317,14 @@ func (s *IntegrationSuite) TestS3PropertiesAsMetadata(c *check.C) {
 func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
-       s.testS3PutObjectSuccess(c, stage.collbucket, "")
+       s.testS3PutObjectSuccess(c, stage.collbucket, "", stage.coll.UUID)
 }
 func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
-       s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+       s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/", stage.coll.UUID)
 }
-func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string, collUUID string) {
        for _, trial := range []struct {
                path        string
                size        int
@@ -362,7 +368,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
                if !c.Check(err, check.NotNil) {
                        continue
                }
-               c.Check(err.(*s3.Error).StatusCode, check.Equals, 404)
+               c.Check(err.(*s3.Error).StatusCode, check.Equals, http.StatusNotFound)
                c.Check(err.(*s3.Error).Code, check.Equals, `NoSuchKey`)
                if !c.Check(err, check.ErrorMatches, `The specified key does not exist.`) {
                        continue
@@ -385,6 +391,14 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
                c.Check(err, check.IsNil)
                c.Check(buf2, check.HasLen, len(buf))
                c.Check(bytes.Equal(buf, buf2), check.Equals, true)
+
+               // Check that the change is immediately visible via
+               // (non-S3) webdav request.
+               _, resp := s.do("GET", "http://"+collUUID+".keep-web.example/"+trial.path, arvadostest.ActiveTokenV2, nil)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+               if !strings.HasSuffix(trial.path, "/") {
+                       c.Check(resp.Body.Len(), check.Equals, trial.size)
+               }
        }
 }
 
@@ -804,8 +818,8 @@ func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
 
        var markers int
        for markers, s.handler.Cluster.Collections.S3FolderObjects = range []bool{false, true} {
-               dirs := 2
-               filesPerDir := 1001
+               dirs := 2000
+               filesPerDir := 2
                stage.writeBigDirs(c, dirs, filesPerDir)
                // Total # objects is:
                //                 2 file entries from s3setup (emptyfile and sailboat.txt)
@@ -814,6 +828,7 @@ func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
                // +filesPerDir*dirs file entries from writeBigDirs (dir0/file0.txt, etc.)
                s.testS3List(c, stage.collbucket, "", 4000, markers+2+(filesPerDir+markers)*dirs)
                s.testS3List(c, stage.collbucket, "", 131, markers+2+(filesPerDir+markers)*dirs)
+               s.testS3List(c, stage.collbucket, "", 51, markers+2+(filesPerDir+markers)*dirs)
                s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir+markers)
        }
 }
@@ -836,6 +851,9 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
                        break
                }
                for _, key := range resp.Contents {
+                       if _, dup := gotKeys[key.Key]; dup {
+                               c.Errorf("got duplicate key %q on page %d", key.Key, pages)
+                       }
                        gotKeys[key.Key] = key
                        if strings.Contains(key.Key, "sailboat.txt") {
                                c.Check(key.Size, check.Equals, int64(4))
@@ -850,7 +868,16 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
                }
                nextMarker = resp.NextMarker
        }
-       c.Check(len(gotKeys), check.Equals, expectFiles)
+       if !c.Check(len(gotKeys), check.Equals, expectFiles) {
+               var sorted []string
+               for k := range gotKeys {
+                       sorted = append(sorted, k)
+               }
+               sort.Strings(sorted)
+               for _, k := range sorted {
+                       c.Logf("got %s", k)
+               }
+       }
 }
 
 func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
@@ -947,28 +974,31 @@ func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
                var expectTruncated bool
                for _, key := range allfiles {
                        full := len(expectKeys)+len(expectPrefixes) >= maxKeys
-                       if !strings.HasPrefix(key, trial.prefix) || key < trial.marker {
+                       if !strings.HasPrefix(key, trial.prefix) || key <= trial.marker {
                                continue
                        } else if idx := strings.Index(key[len(trial.prefix):], trial.delimiter); trial.delimiter != "" && idx >= 0 {
                                prefix := key[:len(trial.prefix)+idx+1]
                                if len(expectPrefixes) > 0 && expectPrefixes[len(expectPrefixes)-1] == prefix {
                                        // same prefix as previous key
                                } else if full {
-                                       expectNextMarker = key
                                        expectTruncated = true
                                } else {
                                        expectPrefixes = append(expectPrefixes, prefix)
+                                       expectNextMarker = prefix
                                }
                        } else if full {
-                               if trial.delimiter != "" {
-                                       expectNextMarker = key
-                               }
                                expectTruncated = true
                                break
                        } else {
                                expectKeys = append(expectKeys, key)
+                               if trial.delimiter != "" {
+                                       expectNextMarker = key
+                               }
                        }
                }
+               if !expectTruncated {
+                       expectNextMarker = ""
+               }
 
                var gotKeys []string
                for _, key := range resp.Contents {