2960: Finish renaming s3aws_volume to s3_volume.
[arvados.git] / services / keepstore / s3_volume_test.go
index 2c5cdf5b99fa3255d03626933d280ac2e7e21a8a..d814949f447ce5f55f6ab581180397c6a40b6e21 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
@@ -19,39 +19,49 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/AdRoll/goamz/s3"
-       "github.com/AdRoll/goamz/s3/s3test"
+
+       "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
+       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+
+       "github.com/johannesboyne/gofakes3"
+       "github.com/johannesboyne/gofakes3/backend/s3mem"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
 const (
-       TestBucketName = "testbucket"
+       s3TestBucketName = "testbucket"
 )
 
-type fakeClock struct {
+type s3AWSFakeClock struct {
        now *time.Time
 }
 
-func (c *fakeClock) Now() time.Time {
+func (c *s3AWSFakeClock) Now() time.Time {
        if c.now == nil {
-               return time.Now()
+               return time.Now().UTC()
        }
-       return *c.now
+       return c.now.UTC()
 }
 
-var _ = check.Suite(&StubbedS3Suite{})
+func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+       return c.Now().Sub(t)
+}
+
+var _ = check.Suite(&stubbedS3Suite{})
+
+var srv httptest.Server
 
-type StubbedS3Suite struct {
+type stubbedS3Suite struct {
        s3server *httptest.Server
        metadata *httptest.Server
        cluster  *arvados.Cluster
-       handler  *handler
-       volumes  []*TestableS3Volume
+       volumes  []*testableS3Volume
 }
 
-func (s *StubbedS3Suite) SetUpTest(c *check.C) {
+func (s *stubbedS3Suite) SetUpTest(c *check.C) {
        s.s3server = nil
        s.metadata = nil
        s.cluster = testCluster(c)
@@ -59,28 +69,41 @@ func (s *StubbedS3Suite) SetUpTest(c *check.C) {
                "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
                "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
        }
-       s.handler = &handler{}
 }
 
-func (s *StubbedS3Suite) TestGeneric(c *check.C) {
-       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+func (s *stubbedS3Suite) TestGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
                // Use a negative raceWindow so s3test's 1-second
                // timestamp precision doesn't confuse fixRace.
-               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+               return s.newTestableVolume(c, params, -2*time.Second)
        })
 }
 
-func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
-       DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
+       DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
+               return s.newTestableVolume(c, params, -2*time.Second)
        })
 }
 
-func (s *StubbedS3Suite) TestIndex(c *check.C) {
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+               v := s.newTestableVolume(c, params, -2*time.Second)
+               v.PrefixLength = 3
+               return v
+       })
+}
+
+func (s *stubbedS3Suite) TestIndex(c *check.C) {
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 0)
        v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
-               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+               err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+               c.Assert(err, check.IsNil)
        }
        for _, spec := range []struct {
                prefix      string
@@ -92,7 +115,7 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
                {"abc", 0},
        } {
                buf := new(bytes.Buffer)
-               err := v.IndexTo(spec.prefix, buf)
+               err := v.Index(context.Background(), spec.prefix, buf)
                c.Check(err, check.IsNil)
 
                idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
@@ -101,7 +124,38 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
        }
 }
 
-func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+func (s *stubbedS3Suite) TestSignature(c *check.C) {
+       var header http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               header = r.Header
+       }))
+       defer stub.Close()
+
+       // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+       // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+       vol := s3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := vol.check("")
+       // Our test S3 server uses the older 'Path Style'
+       vol.bucket.svc.ForcePathStyle = true
+
+       c.Check(err, check.IsNil)
+       err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
                exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
@@ -112,31 +166,52 @@ func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        }))
        defer s.metadata.Close()
 
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-       c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
-       c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+       v := &s3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/latest/api/token",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := v.check(s.metadata.URL + "/latest")
+       c.Check(err, check.IsNil)
+       creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       c.Check(err, check.IsNil)
+       c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
 
        s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                w.WriteHeader(http.StatusNotFound)
        }))
-       deadv := &S3Volume{
-               IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
-               Endpoint: "http://localhost:12345",
-               Region:   "test-region-1",
-               Bucket:   "test-bucket-name",
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               metrics:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+       deadv := &s3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
        }
-       err := deadv.check()
-       c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
-       c.Check(err, check.ErrorMatches, `.*404.*`)
+       err = deadv.check(s.metadata.URL + "/latest")
+       c.Check(err, check.IsNil)
+       _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+       c.Check(err, check.ErrorMatches, `(?s).*404.*`)
 }
 
-func (s *StubbedS3Suite) TestStats(c *check.C) {
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+func (s *stubbedS3Suite) TestStats(c *check.C) {
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
        stats := func() string {
                buf, err := json.Marshal(v.InternalStats())
                c.Check(err, check.IsNil)
@@ -146,30 +221,30 @@ func (s *StubbedS3Suite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := v.Get(context.Background(), loc, make([]byte, 3))
+       _, err := v.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
-       c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
        c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
 
-       err = v.Put(context.Background(), loc, []byte("foo"))
+       err = v.BlockWrite(context.Background(), loc, []byte("foo"))
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
 
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       _, err = v.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.IsNil)
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       _, err = v.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
-type blockingHandler struct {
+type s3AWSBlockingHandler struct {
        requested chan *http.Request
        unblock   chan struct{}
 }
 
-func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
                // Accept PutBucket ("PUT /bucketname/"), called by
                // newTestableVolume
@@ -184,40 +259,30 @@ func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        http.Error(w, "nothing here", http.StatusNotFound)
 }
 
-func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := make([]byte, 3)
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               _, err := v.Get(ctx, loc, buf)
+func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
+       s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+               _, err := v.BlockRead(ctx, fooHash, io.Discard)
                return err
        })
 }
 
-func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("bar")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               return v.Compare(ctx, loc, buf)
-       })
-}
-
-func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("foo")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               return v.Put(ctx, loc, buf)
+func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
+       s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+               return v.BlockWrite(ctx, fooHash, []byte("foo"))
        })
 }
 
-func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
-       handler := &blockingHandler{}
+func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
+       handler := &s3AWSBlockingHandler{}
        s.s3server = httptest.NewServer(handler)
        defer s.s3server.Close()
 
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -253,11 +318,17 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
        }
 }
 
-func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
+func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
        s.cluster.Collections.BlobTrashLifetime.Set("1h")
        s.cluster.Collections.BlobSigningTTL.Set("1h")
 
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               Logger:       ctxlog.TestLogger(c),
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
        var none time.Time
 
        putS3Obj := func(t time.Time, key string, data []byte) {
@@ -265,7 +336,20 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        return
                }
                v.serverClock.now = &t
-               v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+               uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+               _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+                       Bucket: aws.String(v.bucket.bucket),
+                       Key:    aws.String(key),
+                       Body:   bytes.NewReader(data),
+               })
+               if err != nil {
+                       panic(err)
+               }
+               v.serverClock.now = nil
+               _, err = v.head(key)
+               if err != nil {
+                       panic(err)
+               }
        }
 
        t0 := time.Now()
@@ -367,96 +451,128 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, true, true,
                },
        } {
-               c.Log("Scenario: ", scenario.label)
-
-               // We have a few tests to run for each scenario, and
-               // the tests are expected to change state. By calling
-               // this setup func between tests, we (re)create the
-               // scenario as specified, using a new unique block
-               // locator to prevent interference from previous
-               // tests.
-
-               setupScenario := func() (string, []byte) {
-                       nextKey++
-                       blk := []byte(fmt.Sprintf("%d", nextKey))
-                       loc := fmt.Sprintf("%x", md5.Sum(blk))
-                       c.Log("\t", loc)
-                       putS3Obj(scenario.dataT, loc, blk)
-                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
-                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
-                       v.serverClock.now = &t0
-                       return loc, blk
-               }
-
-               // Check canGet
-               loc, blk := setupScenario()
-               buf := make([]byte, len(blk))
-               _, err := v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGet)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Trash, then check canTrash and canGetAfterTrash
-               loc, _ = setupScenario()
-               err = v.Trash(loc)
-               c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Untrash, then check canUntrash
-               loc, _ = setupScenario()
-               err = v.Untrash(loc)
-               c.Check(err == nil, check.Equals, scenario.canUntrash)
-               if scenario.dataT != none || scenario.trashT != none {
-                       // In all scenarios where the data exists, we
-                       // should be able to Get after Untrash --
-                       // regardless of timestamps, errors, race
-                       // conditions, etc.
-                       _, err = v.Get(context.Background(), loc, buf)
+               for _, prefixLength := range []int{0, 3} {
+                       v.PrefixLength = prefixLength
+                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+                       // We have a few tests to run for each scenario, and
+                       // the tests are expected to change state. By calling
+                       // this setup func between tests, we (re)create the
+                       // scenario as specified, using a new unique block
+                       // locator to prevent interference from previous
+                       // tests.
+
+                       setupScenario := func() (string, []byte) {
+                               nextKey++
+                               blk := []byte(fmt.Sprintf("%d", nextKey))
+                               loc := fmt.Sprintf("%x", md5.Sum(blk))
+                               key := loc
+                               if prefixLength > 0 {
+                                       key = loc[:prefixLength] + "/" + loc
+                               }
+                               c.Log("\t", loc, "\t", key)
+                               putS3Obj(scenario.dataT, key, blk)
+                               putS3Obj(scenario.recentT, "recent/"+key, nil)
+                               putS3Obj(scenario.trashT, "trash/"+key, blk)
+                               v.serverClock.now = &t0
+                               return loc, blk
+                       }
+
+                       // Check canGet
+                       loc, blk := setupScenario()
+                       _, err := v.BlockRead(context.Background(), loc, io.Discard)
+                       c.Check(err == nil, check.Equals, scenario.canGet)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Trash, then check canTrash and canGetAfterTrash
+                       loc, _ = setupScenario()
+                       err = v.BlockTrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canTrash)
+                       _, err = v.BlockRead(context.Background(), loc, io.Discard)
+                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Untrash, then check canUntrash
+                       loc, _ = setupScenario()
+                       err = v.BlockUntrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canUntrash)
+                       if scenario.dataT != none || scenario.trashT != none {
+                               // In all scenarios where the data exists, we
+                               // should be able to Get after Untrash --
+                               // regardless of timestamps, errors, race
+                               // conditions, etc.
+                               _, err = v.BlockRead(context.Background(), loc, io.Discard)
+                               c.Check(err, check.IsNil)
+                       }
+
+                       // Call EmptyTrash, then check haveTrashAfterEmpty and
+                       // freshAfterEmpty
+                       loc, _ = setupScenario()
+                       v.EmptyTrash()
+                       _, err = v.head("trash/" + v.key(loc))
+                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+                       if scenario.freshAfterEmpty {
+                               t, err := v.Mtime(loc)
+                               c.Check(err, check.IsNil)
+                               // new mtime must be current (with an
+                               // allowance for 1s timestamp precision)
+                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+                       }
+
+                       // Check for current Mtime after Put (applies to all
+                       // scenarios)
+                       loc, blk = setupScenario()
+                       err = v.BlockWrite(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
-               }
-
-               // Call EmptyTrash, then check haveTrashAfterEmpty and
-               // freshAfterEmpty
-               loc, _ = setupScenario()
-               v.EmptyTrash()
-               _, err = v.bucket.Head("trash/"+loc, nil)
-               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-               if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
-                       // new mtime must be current (with an
-                       // allowance for 1s timestamp precision)
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
-
-               // Check for current Mtime after Put (applies to all
-               // scenarios)
-               loc, blk = setupScenario()
-               err = v.Put(context.Background(), loc, blk)
-               c.Check(err, check.IsNil)
-               t, err := v.Mtime(loc)
-               c.Check(err, check.IsNil)
-               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
        }
 }
 
-type TestableS3Volume struct {
-       *S3Volume
-       server      *s3test.Server
+type testableS3Volume struct {
+       *s3Volume
+       server      *httptest.Server
        c           *check.C
-       serverClock *fakeClock
+       serverClock *s3AWSFakeClock
 }
 
-func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
-       clock := &fakeClock{}
-       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
-       c.Assert(err, check.IsNil)
-       endpoint := srv.URL()
+type LogrusLog struct {
+       log *logrus.FieldLogger
+}
+
+func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+       switch level {
+       case gofakes3.LogErr:
+               (*l.log).Errorln(v...)
+       case gofakes3.LogWarn:
+               (*l.log).Warnln(v...)
+       case gofakes3.LogInfo:
+               (*l.log).Infoln(v...)
+       default:
+               panic("unknown level")
+       }
+}
+
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+
+       clock := &s3AWSFakeClock{}
+       // fake s3
+       backend := s3mem.New(s3mem.WithTimeSource(clock))
+
+       // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
+       /* logger := new(LogrusLog)
+       ctxLogger := ctxlog.FromContext(context.Background())
+       logger.log = &ctxLogger */
+       faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
+       srv := httptest.NewServer(faker.Server())
+
+       endpoint := srv.URL
        if s.s3server != nil {
                endpoint = s.s3server.URL
        }
@@ -466,62 +582,96 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
                iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
        }
 
-       v := &TestableS3Volume{
-               S3Volume: &S3Volume{
-                       AccessKey:          accessKey,
-                       SecretKey:          secretKey,
-                       IAMRole:            iamRole,
-                       Bucket:             TestBucketName,
-                       Endpoint:           endpoint,
-                       Region:             "test-region-1",
-                       LocationConstraint: true,
-                       UnsafeDelete:       true,
-                       IndexPageSize:      1000,
-                       cluster:            cluster,
-                       volume:             volume,
-                       logger:             ctxlog.TestLogger(c),
-                       metrics:            metrics,
+       v := &testableS3Volume{
+               s3Volume: &s3Volume{
+                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                               IAMRole:            iamRole,
+                               AccessKeyID:        accessKey,
+                               SecretAccessKey:    secretKey,
+                               Bucket:             s3TestBucketName,
+                               Endpoint:           endpoint,
+                               Region:             "test-region-1",
+                               LocationConstraint: true,
+                               UnsafeDelete:       true,
+                               IndexPageSize:      1000,
+                       },
+                       cluster:    params.Cluster,
+                       volume:     params.ConfigVolume,
+                       logger:     params.Logger,
+                       metrics:    params.MetricsVecs,
+                       bufferPool: params.BufferPool,
                },
                c:           c,
                server:      srv,
                serverClock: clock,
        }
-       c.Assert(v.S3Volume.check(), check.IsNil)
-       c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
+       c.Assert(v.s3Volume.check(""), check.IsNil)
+       // Our test S3 server uses the older 'Path Style'
+       v.s3Volume.bucket.svc.ForcePathStyle = true
+       // Create the testbucket
+       input := &s3.CreateBucketInput{
+               Bucket: aws.String(s3TestBucketName),
+       }
+       req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
+       _, err := req.Send(context.Background())
+       c.Assert(err, check.IsNil)
        // We couldn't set RaceWindow until now because check()
        // rejects negative values.
-       v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
+       v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
        return v
 }
 
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
-       if err != nil {
-               v.logger.Printf("PutRaw: %s: %+v", loc, err)
-       }
-       err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
+       key := v.key(loc)
+       r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+               u.PartSize = 5 * 1024 * 1024
+               u.Concurrency = 13
+       })
+
+       _, err := uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(key),
+               Body:   r,
+       })
        if err != nil {
-               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+               return err
        }
+
+       empty := bytes.NewReader([]byte{})
+       _, err = uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + key),
+               Body:   empty,
+       })
+       return err
 }
 
 // TouchWithDate turns back the clock while doing a Touch(). We assume
 // there are no other operations happening on the same s3test server
 // while we do this.
-func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
+func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+       empty := bytes.NewReader([]byte{})
+       _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + v.key(loc)),
+               Body:   empty,
+       })
        if err != nil {
                panic(err)
        }
+
        v.serverClock.now = nil
 }
 
-func (v *TestableS3Volume) Teardown() {
-       v.server.Quit()
+func (v *testableS3Volume) Teardown() {
+       v.server.Close()
 }
 
-func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
        return "get", "put"
 }