Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / s3_volume_test.go
index cb08e44ac8626134a03063f5426f74fdf29395d4..a82098356859cb3cc481d20df453efb97e1726d0 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
@@ -76,6 +76,14 @@ func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
        })
 }
 
+func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+               v.PrefixLength = 3
+               return v
+       })
+}
+
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
        v.IndexPageSize = 3
@@ -416,81 +424,88 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, true, true,
                },
        } {
-               c.Log("Scenario: ", scenario.label)
-
-               // We have a few tests to run for each scenario, and
-               // the tests are expected to change state. By calling
-               // this setup func between tests, we (re)create the
-               // scenario as specified, using a new unique block
-               // locator to prevent interference from previous
-               // tests.
-
-               setupScenario := func() (string, []byte) {
-                       nextKey++
-                       blk := []byte(fmt.Sprintf("%d", nextKey))
-                       loc := fmt.Sprintf("%x", md5.Sum(blk))
-                       c.Log("\t", loc)
-                       putS3Obj(scenario.dataT, loc, blk)
-                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
-                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
-                       v.serverClock.now = &t0
-                       return loc, blk
-               }
-
-               // Check canGet
-               loc, blk := setupScenario()
-               buf := make([]byte, len(blk))
-               _, err := v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGet)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Trash, then check canTrash and canGetAfterTrash
-               loc, _ = setupScenario()
-               err = v.Trash(loc)
-               c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Untrash, then check canUntrash
-               loc, _ = setupScenario()
-               err = v.Untrash(loc)
-               c.Check(err == nil, check.Equals, scenario.canUntrash)
-               if scenario.dataT != none || scenario.trashT != none {
-                       // In all scenarios where the data exists, we
-                       // should be able to Get after Untrash --
-                       // regardless of timestamps, errors, race
-                       // conditions, etc.
+               for _, prefixLength := range []int{0, 3} {
+                       v.PrefixLength = prefixLength
+                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+                       // We have a few tests to run for each scenario, and
+                       // the tests are expected to change state. By calling
+                       // this setup func between tests, we (re)create the
+                       // scenario as specified, using a new unique block
+                       // locator to prevent interference from previous
+                       // tests.
+
+                       setupScenario := func() (string, []byte) {
+                               nextKey++
+                               blk := []byte(fmt.Sprintf("%d", nextKey))
+                               loc := fmt.Sprintf("%x", md5.Sum(blk))
+                               key := loc
+                               if prefixLength > 0 {
+                                       key = loc[:prefixLength] + "/" + loc
+                               }
+                               c.Log("\t", loc)
+                               putS3Obj(scenario.dataT, key, blk)
+                               putS3Obj(scenario.recentT, "recent/"+key, nil)
+                               putS3Obj(scenario.trashT, "trash/"+key, blk)
+                               v.serverClock.now = &t0
+                               return loc, blk
+                       }
+
+                       // Check canGet
+                       loc, blk := setupScenario()
+                       buf := make([]byte, len(blk))
+                       _, err := v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGet)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Trash, then check canTrash and canGetAfterTrash
+                       loc, _ = setupScenario()
+                       err = v.Trash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canTrash)
                        _, err = v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Untrash, then check canUntrash
+                       loc, _ = setupScenario()
+                       err = v.Untrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canUntrash)
+                       if scenario.dataT != none || scenario.trashT != none {
+                               // In all scenarios where the data exists, we
+                               // should be able to Get after Untrash --
+                               // regardless of timestamps, errors, race
+                               // conditions, etc.
+                               _, err = v.Get(context.Background(), loc, buf)
+                               c.Check(err, check.IsNil)
+                       }
+
+                       // Call EmptyTrash, then check haveTrashAfterEmpty and
+                       // freshAfterEmpty
+                       loc, _ = setupScenario()
+                       v.EmptyTrash()
+                       _, err = v.bucket.Head("trash/"+v.key(loc), nil)
+                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+                       if scenario.freshAfterEmpty {
+                               t, err := v.Mtime(loc)
+                               c.Check(err, check.IsNil)
+                               // new mtime must be current (with an
+                               // allowance for 1s timestamp precision)
+                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+                       }
+
+                       // Check for current Mtime after Put (applies to all
+                       // scenarios)
+                       loc, blk = setupScenario()
+                       err = v.Put(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
-               }
-
-               // Call EmptyTrash, then check haveTrashAfterEmpty and
-               // freshAfterEmpty
-               loc, _ = setupScenario()
-               v.EmptyTrash()
-               _, err = v.bucket.Head("trash/"+loc, nil)
-               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-               if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
-                       // new mtime must be current (with an
-                       // allowance for 1s timestamp precision)
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
-
-               // Check for current Mtime after Put (applies to all
-               // scenarios)
-               loc, blk = setupScenario()
-               err = v.Put(context.Background(), loc, blk)
-               c.Check(err, check.IsNil)
-               t, err := v.Mtime(loc)
-               c.Check(err, check.IsNil)
-               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
        }
 }
 
@@ -547,13 +562,14 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       key := v.key(loc)
+       err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                v.logger.Printf("PutRaw: %s: %+v", loc, err)
        }
-       err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+               v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
        }
 }
 
@@ -562,7 +578,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                panic(err)
        }