Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / s3_volume_test.go
index 2c5cdf5b99fa3255d03626933d280ac2e7e21a8a..a82098356859cb3cc481d20df453efb97e1726d0 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
@@ -76,6 +76,14 @@ func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
        })
 }
 
+func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+               v.PrefixLength = 3
+               return v
+       })
+}
+
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
        v.IndexPageSize = 3
@@ -101,6 +109,53 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
        }
 }
 
+func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
+       var header http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               header = r.Header
+       }))
+       defer stub.Close()
+
+       // Default V4 signature
+       vol := S3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := vol.check()
+       c.Check(err, check.IsNil)
+       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+
+       // Force V2 signature
+       vol = S3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+                       V2Signature:     true,
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err = vol.check()
+       c.Check(err, check.IsNil)
+       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS xxx:.*`)
+}
+
 func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
@@ -113,8 +168,8 @@ func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        defer s.metadata.Close()
 
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-       c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+       c.Check(v.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(v.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
        c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
        c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
 
@@ -122,13 +177,15 @@ func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
                w.WriteHeader(http.StatusNotFound)
        }))
        deadv := &S3Volume{
-               IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
-               Endpoint: "http://localhost:12345",
-               Region:   "test-region-1",
-               Bucket:   "test-bucket-name",
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               metrics:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
        }
        err := deadv.check()
        c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
@@ -367,81 +424,88 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, true, true,
                },
        } {
-               c.Log("Scenario: ", scenario.label)
-
-               // We have a few tests to run for each scenario, and
-               // the tests are expected to change state. By calling
-               // this setup func between tests, we (re)create the
-               // scenario as specified, using a new unique block
-               // locator to prevent interference from previous
-               // tests.
-
-               setupScenario := func() (string, []byte) {
-                       nextKey++
-                       blk := []byte(fmt.Sprintf("%d", nextKey))
-                       loc := fmt.Sprintf("%x", md5.Sum(blk))
-                       c.Log("\t", loc)
-                       putS3Obj(scenario.dataT, loc, blk)
-                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
-                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
-                       v.serverClock.now = &t0
-                       return loc, blk
-               }
-
-               // Check canGet
-               loc, blk := setupScenario()
-               buf := make([]byte, len(blk))
-               _, err := v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGet)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Trash, then check canTrash and canGetAfterTrash
-               loc, _ = setupScenario()
-               err = v.Trash(loc)
-               c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Untrash, then check canUntrash
-               loc, _ = setupScenario()
-               err = v.Untrash(loc)
-               c.Check(err == nil, check.Equals, scenario.canUntrash)
-               if scenario.dataT != none || scenario.trashT != none {
-                       // In all scenarios where the data exists, we
-                       // should be able to Get after Untrash --
-                       // regardless of timestamps, errors, race
-                       // conditions, etc.
+               for _, prefixLength := range []int{0, 3} {
+                       v.PrefixLength = prefixLength
+                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+                       // We have a few tests to run for each scenario, and
+                       // the tests are expected to change state. By calling
+                       // this setup func between tests, we (re)create the
+                       // scenario as specified, using a new unique block
+                       // locator to prevent interference from previous
+                       // tests.
+
+                       setupScenario := func() (string, []byte) {
+                               nextKey++
+                               blk := []byte(fmt.Sprintf("%d", nextKey))
+                               loc := fmt.Sprintf("%x", md5.Sum(blk))
+                               key := loc
+                               if prefixLength > 0 {
+                                       key = loc[:prefixLength] + "/" + loc
+                               }
+                               c.Log("\t", loc)
+                               putS3Obj(scenario.dataT, key, blk)
+                               putS3Obj(scenario.recentT, "recent/"+key, nil)
+                               putS3Obj(scenario.trashT, "trash/"+key, blk)
+                               v.serverClock.now = &t0
+                               return loc, blk
+                       }
+
+                       // Check canGet
+                       loc, blk := setupScenario()
+                       buf := make([]byte, len(blk))
+                       _, err := v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGet)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Trash, then check canTrash and canGetAfterTrash
+                       loc, _ = setupScenario()
+                       err = v.Trash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canTrash)
                        _, err = v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Untrash, then check canUntrash
+                       loc, _ = setupScenario()
+                       err = v.Untrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canUntrash)
+                       if scenario.dataT != none || scenario.trashT != none {
+                               // In all scenarios where the data exists, we
+                               // should be able to Get after Untrash --
+                               // regardless of timestamps, errors, race
+                               // conditions, etc.
+                               _, err = v.Get(context.Background(), loc, buf)
+                               c.Check(err, check.IsNil)
+                       }
+
+                       // Call EmptyTrash, then check haveTrashAfterEmpty and
+                       // freshAfterEmpty
+                       loc, _ = setupScenario()
+                       v.EmptyTrash()
+                       _, err = v.bucket.Head("trash/"+v.key(loc), nil)
+                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+                       if scenario.freshAfterEmpty {
+                               t, err := v.Mtime(loc)
+                               c.Check(err, check.IsNil)
+                               // new mtime must be current (with an
+                               // allowance for 1s timestamp precision)
+                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+                       }
+
+                       // Check for current Mtime after Put (applies to all
+                       // scenarios)
+                       loc, blk = setupScenario()
+                       err = v.Put(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
-               }
-
-               // Call EmptyTrash, then check haveTrashAfterEmpty and
-               // freshAfterEmpty
-               loc, _ = setupScenario()
-               v.EmptyTrash()
-               _, err = v.bucket.Head("trash/"+loc, nil)
-               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-               if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
-                       // new mtime must be current (with an
-                       // allowance for 1s timestamp precision)
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
-
-               // Check for current Mtime after Put (applies to all
-               // scenarios)
-               loc, blk = setupScenario()
-               err = v.Put(context.Background(), loc, blk)
-               c.Check(err, check.IsNil)
-               t, err := v.Mtime(loc)
-               c.Check(err, check.IsNil)
-               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
        }
 }
 
@@ -468,19 +532,21 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
        v := &TestableS3Volume{
                S3Volume: &S3Volume{
-                       AccessKey:          accessKey,
-                       SecretKey:          secretKey,
-                       IAMRole:            iamRole,
-                       Bucket:             TestBucketName,
-                       Endpoint:           endpoint,
-                       Region:             "test-region-1",
-                       LocationConstraint: true,
-                       UnsafeDelete:       true,
-                       IndexPageSize:      1000,
-                       cluster:            cluster,
-                       volume:             volume,
-                       logger:             ctxlog.TestLogger(c),
-                       metrics:            metrics,
+                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                               IAMRole:            iamRole,
+                               AccessKeyID:        accessKey,
+                               SecretAccessKey:    secretKey,
+                               Bucket:             TestBucketName,
+                               Endpoint:           endpoint,
+                               Region:             "test-region-1",
+                               LocationConstraint: true,
+                               UnsafeDelete:       true,
+                               IndexPageSize:      1000,
+                       },
+                       cluster: cluster,
+                       volume:  volume,
+                       logger:  ctxlog.TestLogger(c),
+                       metrics: metrics,
                },
                c:           c,
                server:      srv,
@@ -496,13 +562,14 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       key := v.key(loc)
+       err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                v.logger.Printf("PutRaw: %s: %+v", loc, err)
        }
-       err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+               v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
        }
 }
 
@@ -511,7 +578,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                panic(err)
        }