Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / s3_volume_test.go
index 2736f00b743c791502f78886e716b521a0585eb1..a82098356859cb3cc481d20df453efb97e1726d0 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
@@ -76,6 +76,14 @@ func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
        })
 }
 
+func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+               v.PrefixLength = 3
+               return v
+       })
+}
+
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
        v.IndexPageSize = 3
@@ -111,11 +119,11 @@ func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
        // Default V4 signature
        vol := S3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                       AccessKey: "xxx",
-                       SecretKey: "xxx",
-                       Endpoint:  stub.URL,
-                       Region:    "test-region-1",
-                       Bucket:    "test-bucket-name",
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
                },
                cluster: s.cluster,
                logger:  ctxlog.TestLogger(c),
@@ -130,12 +138,12 @@ func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
        // Force V2 signature
        vol = S3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                       AccessKey:   "xxx",
-                       SecretKey:   "xxx",
-                       Endpoint:    stub.URL,
-                       Region:      "test-region-1",
-                       Bucket:      "test-bucket-name",
-                       V2Signature: true,
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+                       V2Signature:     true,
                },
                cluster: s.cluster,
                logger:  ctxlog.TestLogger(c),
@@ -160,8 +168,8 @@ func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        defer s.metadata.Close()
 
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-       c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+       c.Check(v.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(v.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
        c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
        c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
 
@@ -416,81 +424,88 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, true, true,
                },
        } {
-               c.Log("Scenario: ", scenario.label)
-
-               // We have a few tests to run for each scenario, and
-               // the tests are expected to change state. By calling
-               // this setup func between tests, we (re)create the
-               // scenario as specified, using a new unique block
-               // locator to prevent interference from previous
-               // tests.
-
-               setupScenario := func() (string, []byte) {
-                       nextKey++
-                       blk := []byte(fmt.Sprintf("%d", nextKey))
-                       loc := fmt.Sprintf("%x", md5.Sum(blk))
-                       c.Log("\t", loc)
-                       putS3Obj(scenario.dataT, loc, blk)
-                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
-                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
-                       v.serverClock.now = &t0
-                       return loc, blk
-               }
-
-               // Check canGet
-               loc, blk := setupScenario()
-               buf := make([]byte, len(blk))
-               _, err := v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGet)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Trash, then check canTrash and canGetAfterTrash
-               loc, _ = setupScenario()
-               err = v.Trash(loc)
-               c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Untrash, then check canUntrash
-               loc, _ = setupScenario()
-               err = v.Untrash(loc)
-               c.Check(err == nil, check.Equals, scenario.canUntrash)
-               if scenario.dataT != none || scenario.trashT != none {
-                       // In all scenarios where the data exists, we
-                       // should be able to Get after Untrash --
-                       // regardless of timestamps, errors, race
-                       // conditions, etc.
+               for _, prefixLength := range []int{0, 3} {
+                       v.PrefixLength = prefixLength
+                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+                       // We have a few tests to run for each scenario, and
+                       // the tests are expected to change state. By calling
+                       // this setup func between tests, we (re)create the
+                       // scenario as specified, using a new unique block
+                       // locator to prevent interference from previous
+                       // tests.
+
+                       setupScenario := func() (string, []byte) {
+                               nextKey++
+                               blk := []byte(fmt.Sprintf("%d", nextKey))
+                               loc := fmt.Sprintf("%x", md5.Sum(blk))
+                               key := loc
+                               if prefixLength > 0 {
+                                       key = loc[:prefixLength] + "/" + loc
+                               }
+                               c.Log("\t", loc)
+                               putS3Obj(scenario.dataT, key, blk)
+                               putS3Obj(scenario.recentT, "recent/"+key, nil)
+                               putS3Obj(scenario.trashT, "trash/"+key, blk)
+                               v.serverClock.now = &t0
+                               return loc, blk
+                       }
+
+                       // Check canGet
+                       loc, blk := setupScenario()
+                       buf := make([]byte, len(blk))
+                       _, err := v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGet)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Trash, then check canTrash and canGetAfterTrash
+                       loc, _ = setupScenario()
+                       err = v.Trash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canTrash)
                        _, err = v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Untrash, then check canUntrash
+                       loc, _ = setupScenario()
+                       err = v.Untrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canUntrash)
+                       if scenario.dataT != none || scenario.trashT != none {
+                               // In all scenarios where the data exists, we
+                               // should be able to Get after Untrash --
+                               // regardless of timestamps, errors, race
+                               // conditions, etc.
+                               _, err = v.Get(context.Background(), loc, buf)
+                               c.Check(err, check.IsNil)
+                       }
+
+                       // Call EmptyTrash, then check haveTrashAfterEmpty and
+                       // freshAfterEmpty
+                       loc, _ = setupScenario()
+                       v.EmptyTrash()
+                       _, err = v.bucket.Head("trash/"+v.key(loc), nil)
+                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+                       if scenario.freshAfterEmpty {
+                               t, err := v.Mtime(loc)
+                               c.Check(err, check.IsNil)
+                               // new mtime must be current (with an
+                               // allowance for 1s timestamp precision)
+                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+                       }
+
+                       // Check for current Mtime after Put (applies to all
+                       // scenarios)
+                       loc, blk = setupScenario()
+                       err = v.Put(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
-               }
-
-               // Call EmptyTrash, then check haveTrashAfterEmpty and
-               // freshAfterEmpty
-               loc, _ = setupScenario()
-               v.EmptyTrash()
-               _, err = v.bucket.Head("trash/"+loc, nil)
-               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-               if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
-                       // new mtime must be current (with an
-                       // allowance for 1s timestamp precision)
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
-
-               // Check for current Mtime after Put (applies to all
-               // scenarios)
-               loc, blk = setupScenario()
-               err = v.Put(context.Background(), loc, blk)
-               c.Check(err, check.IsNil)
-               t, err := v.Mtime(loc)
-               c.Check(err, check.IsNil)
-               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
        }
 }
 
@@ -519,8 +534,8 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
                S3Volume: &S3Volume{
                        S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
                                IAMRole:            iamRole,
-                               AccessKey:          accessKey,
-                               SecretKey:          secretKey,
+                               AccessKeyID:        accessKey,
+                               SecretAccessKey:    secretKey,
                                Bucket:             TestBucketName,
                                Endpoint:           endpoint,
                                Region:             "test-region-1",
@@ -547,13 +562,14 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       key := v.key(loc)
+       err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                v.logger.Printf("PutRaw: %s: %+v", loc, err)
        }
-       err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+               v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
        }
 }
 
@@ -562,7 +578,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                panic(err)
        }