Merge branch '16535-s3'
[arvados.git] / services / keep-web / s3_test.go
index 349be7777f2ce21e33d5dbcd82107c74adacd13f..b82f1efd7818b1fd26f5bbe6ffad5cc9f5fff5ec 100644 (file)
@@ -9,6 +9,7 @@ import (
        "crypto/rand"
        "fmt"
        "io/ioutil"
+       "net/http"
        "os"
        "strings"
        "sync"
@@ -97,6 +98,22 @@ func (stage s3stage) teardown(c *check.C) {
                err := stage.arv.RequestAndDecode(&stage.coll, "DELETE", "arvados/v1/collections/"+stage.coll.UUID, nil, nil)
                c.Check(err, check.IsNil)
        }
+       if stage.proj.UUID != "" {
+               err := stage.arv.RequestAndDecode(&stage.proj, "DELETE", "arvados/v1/groups/"+stage.proj.UUID, nil, nil)
+               c.Check(err, check.IsNil)
+       }
+}
+
+func (s *IntegrationSuite) TestS3HeadBucket(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+
+       for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+               c.Logf("bucket %s", bucket.Name)
+               exists, err := bucket.Exists("")
+               c.Check(err, check.IsNil)
+               c.Check(exists, check.Equals, true)
+       }
 }
 
 func (s *IntegrationSuite) TestS3CollectionGetObject(c *check.C) {
@@ -118,9 +135,16 @@ func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix
        err = rdr.Close()
        c.Check(err, check.IsNil)
 
+       // GetObject
        rdr, err = bucket.GetReader(prefix + "missingfile")
        c.Check(err, check.ErrorMatches, `404 Not Found`)
 
+       // HeadObject
+       exists, err := bucket.Exists(prefix + "missingfile")
+       c.Check(err, check.IsNil)
+       c.Check(exists, check.Equals, false)
+
+       // GetObject
        rdr, err = bucket.GetReader(prefix + "sailboat.txt")
        c.Assert(err, check.IsNil)
        buf, err = ioutil.ReadAll(rdr)
@@ -128,6 +152,11 @@ func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix
        c.Check(buf, check.DeepEquals, []byte("⛵\n"))
        err = rdr.Close()
        c.Check(err, check.IsNil)
+
+       // HeadObject
+       exists, err = bucket.Exists(prefix + "sailboat.txt")
+       c.Check(err, check.IsNil)
+       c.Check(exists, check.Equals, true)
 }
 
 func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
@@ -142,18 +171,26 @@ func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
 }
 func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
        for _, trial := range []struct {
-               path string
-               size int
+               path        string
+               size        int
+               contentType string
        }{
                {
-                       path: "newfile",
-                       size: 128000000,
+                       path:        "newfile",
+                       size:        128000000,
+                       contentType: "application/octet-stream",
                }, {
-                       path: "newdir/newfile",
-                       size: 1 << 26,
+                       path:        "newdir/newfile",
+                       size:        1 << 26,
+                       contentType: "application/octet-stream",
                }, {
-                       path: "newdir1/newdir2/newfile",
-                       size: 0,
+                       path:        "newdir1/newdir2/newfile",
+                       size:        0,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir1/newdir2/newdir3/",
+                       size:        0,
+                       contentType: "application/x-directory",
                },
        } {
                c.Logf("=== %v", trial)
@@ -166,11 +203,14 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
                buf := make([]byte, trial.size)
                rand.Read(buf)
 
-               err = bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+               err = bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), trial.contentType, s3.Private, s3.Options{})
                c.Check(err, check.IsNil)
 
                rdr, err := bucket.GetReader(objname)
-               if !c.Check(err, check.IsNil) {
+               if strings.HasSuffix(trial.path, "/") && !s.testServer.Config.cluster.Collections.S3FolderObjects {
+                       c.Check(err, check.NotNil)
+                       continue
+               } else if !c.Check(err, check.IsNil) {
                        continue
                }
                buf2, err := ioutil.ReadAll(rdr)
@@ -180,6 +220,83 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
        }
 }
 
+func (s *IntegrationSuite) TestS3ProjectPutObjectNotSupported(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       bucket := stage.projbucket
+
+       for _, trial := range []struct {
+               path        string
+               size        int
+               contentType string
+       }{
+               {
+                       path:        "newfile",
+                       size:        1234,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir/newfile",
+                       size:        1234,
+                       contentType: "application/octet-stream",
+               }, {
+                       path:        "newdir2/",
+                       size:        0,
+                       contentType: "application/x-directory",
+               },
+       } {
+               c.Logf("=== %v", trial)
+
+               _, err := bucket.GetReader(trial.path)
+               c.Assert(err, check.ErrorMatches, `404 Not Found`)
+
+               buf := make([]byte, trial.size)
+               rand.Read(buf)
+
+               err = bucket.PutReader(trial.path, bytes.NewReader(buf), int64(len(buf)), trial.contentType, s3.Private, s3.Options{})
+               c.Check(err, check.ErrorMatches, `400 Bad Request`)
+
+               _, err = bucket.GetReader(trial.path)
+               c.Assert(err, check.ErrorMatches, `404 Not Found`)
+       }
+}
+
+func (s *IntegrationSuite) TestS3CollectionDeleteObject(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3DeleteObject(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectDeleteObject(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3DeleteObject(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3DeleteObject(c *check.C, bucket *s3.Bucket, prefix string) {
+       s.testServer.Config.cluster.Collections.S3FolderObjects = true
+       for _, trial := range []struct {
+               path string
+       }{
+               {"/"},
+               {"nonexistentfile"},
+               {"emptyfile"},
+               {"sailboat.txt"},
+               {"sailboat.txt/"},
+               {"emptydir"},
+               {"emptydir/"},
+       } {
+               objname := prefix + trial.path
+               comment := check.Commentf("objname %q", objname)
+
+               err := bucket.Del(objname)
+               if trial.path == "/" {
+                       c.Check(err, check.NotNil)
+                       continue
+               }
+               c.Check(err, check.IsNil, comment)
+               _, err = bucket.GetReader(objname)
+               c.Check(err, check.NotNil, comment)
+       }
+}
+
 func (s *IntegrationSuite) TestS3CollectionPutObjectFailure(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
@@ -191,6 +308,7 @@ func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
        s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
 }
 func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
+       s.testServer.Config.cluster.Collections.S3FolderObjects = false
        var wg sync.WaitGroup
        for _, trial := range []struct {
                path string
@@ -259,17 +377,44 @@ func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
        c.Assert(fs.Sync(), check.IsNil)
 }
 
+func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+               req, err := http.NewRequest("GET", bucket.URL("/"), nil)
+               c.Check(err, check.IsNil)
+               req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+               req.URL.RawQuery = "versioning"
+               resp, err := http.DefaultClient.Do(req)
+               c.Assert(err, check.IsNil)
+               c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+               buf, err := ioutil.ReadAll(resp.Body)
+               c.Assert(err, check.IsNil)
+               c.Check(string(buf), check.Equals, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"/>\n")
+       }
+}
+
 func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
 
-       filesPerDir := 1001
-       stage.writeBigDirs(c, 2, filesPerDir)
-       s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
-       s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
-       s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir)
+       var markers int
+       for markers, s.testServer.Config.cluster.Collections.S3FolderObjects = range []bool{false, true} {
+               dirs := 2
+               filesPerDir := 1001
+               stage.writeBigDirs(c, dirs, filesPerDir)
+               // Total # objects is:
+               //                 2 file entries from s3setup (emptyfile and sailboat.txt)
+               //                +1 fake "directory" marker from s3setup (emptydir) (if enabled)
+               //             +dirs fake "directory" marker from writeBigDirs (dir0/, dir1/) (if enabled)
+               // +filesPerDir*dirs file entries from writeBigDirs (dir0/file0.txt, etc.)
+               s.testS3List(c, stage.collbucket, "", 4000, markers+2+(filesPerDir+markers)*dirs)
+               s.testS3List(c, stage.collbucket, "", 131, markers+2+(filesPerDir+markers)*dirs)
+               s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir+markers)
+       }
 }
 func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+       c.Logf("testS3List: prefix=%q pageSize=%d S3FolderObjects=%v", prefix, pageSize, s.testServer.Config.cluster.Collections.S3FolderObjects)
        expectPageSize := pageSize
        if expectPageSize > 1000 {
                expectPageSize = 1000
@@ -288,6 +433,9 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
                }
                for _, key := range resp.Contents {
                        gotKeys[key.Key] = key
+                       if strings.Contains(key.Key, "sailboat.txt") {
+                               c.Check(key.Size, check.Equals, int64(4))
+                       }
                }
                if !resp.IsTruncated {
                        c.Check(resp.NextMarker, check.Equals, "")
@@ -302,6 +450,12 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
 }
 
 func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
+       for _, s.testServer.Config.cluster.Collections.S3FolderObjects = range []bool{false, true} {
+               s.testS3CollectionListRollup(c)
+       }
+}
+
+func (s *IntegrationSuite) testS3CollectionListRollup(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
 
@@ -324,17 +478,40 @@ func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
                        break
                }
        }
-       c.Check(allfiles, check.HasLen, dirs*filesPerDir+3)
+       markers := 0
+       if s.testServer.Config.cluster.Collections.S3FolderObjects {
+               markers = 1
+       }
+       c.Check(allfiles, check.HasLen, dirs*(filesPerDir+markers)+3+markers)
+
+       gotDirMarker := map[string]bool{}
+       for _, name := range allfiles {
+               isDirMarker := strings.HasSuffix(name, "/")
+               if markers == 0 {
+                       c.Check(isDirMarker, check.Equals, false, check.Commentf("name %q", name))
+               } else if isDirMarker {
+                       gotDirMarker[name] = true
+               } else if i := strings.LastIndex(name, "/"); i >= 0 {
+                       c.Check(gotDirMarker[name[:i+1]], check.Equals, true, check.Commentf("name %q", name))
+                       gotDirMarker[name[:i+1]] = true // skip redundant complaints about this dir marker
+               }
+       }
 
        for _, trial := range []struct {
                prefix    string
                delimiter string
                marker    string
        }{
+               {"", "", ""},
                {"di", "/", ""},
                {"di", "r", ""},
                {"di", "n", ""},
                {"dir0", "/", ""},
+               {"dir0/", "/", ""},
+               {"dir0/f", "/", ""},
+               {"dir0", "", ""},
+               {"dir0/", "", ""},
+               {"dir0/f", "", ""},
                {"dir0", "/", "dir0/file14.txt"},       // no commonprefixes
                {"", "", "dir0/file14.txt"},            // middle page, skip walking dir1
                {"", "", "dir1/file14.txt"},            // middle page, skip walking dir0
@@ -345,7 +522,7 @@ func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
                {"dir2", "/", ""},                      // prefix "dir2" does not exist
                {"", "/", ""},
        } {
-               c.Logf("\n\n=== trial %+v", trial)
+               c.Logf("\n\n=== trial %+v markers=%d", trial, markers)
 
                maxKeys := 20
                resp, err := stage.collbucket.List(trial.prefix, trial.delimiter, trial.marker, maxKeys)
@@ -397,10 +574,11 @@ func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
                for _, prefix := range resp.CommonPrefixes {
                        gotPrefixes = append(gotPrefixes, prefix)
                }
-               c.Check(gotKeys, check.DeepEquals, expectKeys)
-               c.Check(gotPrefixes, check.DeepEquals, expectPrefixes)
-               c.Check(resp.NextMarker, check.Equals, expectNextMarker)
-               c.Check(resp.IsTruncated, check.Equals, expectTruncated)
+               commentf := check.Commentf("trial %+v markers=%d", trial, markers)
+               c.Check(gotKeys, check.DeepEquals, expectKeys, commentf)
+               c.Check(gotPrefixes, check.DeepEquals, expectPrefixes, commentf)
+               c.Check(resp.NextMarker, check.Equals, expectNextMarker, commentf)
+               c.Check(resp.IsTruncated, check.Equals, expectTruncated, commentf)
                c.Logf("=== trial %+v keys %q prefixes %q nextMarker %q", trial, gotKeys, gotPrefixes, resp.NextMarker)
        }
 }