Merge branch '21765-short-dialog'
[arvados.git] / services / keepstore / s3_volume_test.go
index acc1b11df32526c132d763d970915f9f30735437..df9fe726837f340b55c9f0144dcc59a87374718a 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
 
 import (
        "bytes"
@@ -10,64 +10,105 @@ import (
        "crypto/md5"
        "encoding/json"
        "fmt"
        "crypto/md5"
        "encoding/json"
        "fmt"
-       "io/ioutil"
+       "io"
        "net/http"
        "net/http/httptest"
        "os"
        "net/http"
        "net/http/httptest"
        "os"
+       "strings"
+       "sync/atomic"
        "time"
 
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/AdRoll/goamz/s3"
-       "github.com/AdRoll/goamz/s3/s3test"
-       check "gopkg.in/check.v1"
-)
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+
+       "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
 
 
-const (
-       TestBucketName = "testbucket"
+       "github.com/johannesboyne/gofakes3"
+       "github.com/johannesboyne/gofakes3/backend/s3mem"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
 )
 
 )
 
-type fakeClock struct {
+type s3fakeClock struct {
        now *time.Time
 }
 
        now *time.Time
 }
 
-func (c *fakeClock) Now() time.Time {
+func (c *s3fakeClock) Now() time.Time {
        if c.now == nil {
        if c.now == nil {
-               return time.Now()
+               return time.Now().UTC()
        }
        }
-       return *c.now
+       return c.now.UTC()
 }
 
 }
 
-func init() {
-       // Deleting isn't safe from races, but if it's turned on
-       // anyway we do expect it to pass the generic volume tests.
-       s3UnsafeDelete = true
+func (c *s3fakeClock) Since(t time.Time) time.Duration {
+       return c.Now().Sub(t)
 }
 
 }
 
-var _ = check.Suite(&StubbedS3Suite{})
+var _ = check.Suite(&stubbedS3Suite{})
+
+var srv httptest.Server
 
 
-type StubbedS3Suite struct {
-       volumes []*TestableS3Volume
+type stubbedS3Suite struct {
+       s3server    *httptest.Server
+       s3fakeClock *s3fakeClock
+       metadata    *httptest.Server
+       cluster     *arvados.Cluster
+       volumes     []*testableS3Volume
+}
+
+func (s *stubbedS3Suite) SetUpTest(c *check.C) {
+       s.s3server = nil
+       s.s3fakeClock = &s3fakeClock{}
+       s.metadata = nil
+       s.cluster = testCluster(c)
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
+               "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
+       }
+}
+
+func (s *stubbedS3Suite) TearDownTest(c *check.C) {
+       if s.s3server != nil {
+               s.s3server.Close()
+       }
 }
 
 }
 
-func (s *StubbedS3Suite) TestGeneric(c *check.C) {
-       DoGenericVolumeTests(c, func(t TB) TestableVolume {
+func (s *stubbedS3Suite) TestGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
                // Use a negative raceWindow so s3test's 1-second
                // timestamp precision doesn't confuse fixRace.
                // Use a negative raceWindow so s3test's 1-second
                // timestamp precision doesn't confuse fixRace.
-               return s.newTestableVolume(c, -2*time.Second, false, 2)
+               return s.newTestableVolume(c, params, -2*time.Second)
        })
 }
 
        })
 }
 
-func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
-       DoGenericVolumeTests(c, func(t TB) TestableVolume {
-               return s.newTestableVolume(c, -2*time.Second, true, 2)
+func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
+       DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
+               return s.newTestableVolume(c, params, -2*time.Second)
        })
 }
 
        })
 }
 
-func (s *StubbedS3Suite) TestIndex(c *check.C) {
-       v := s.newTestableVolume(c, 0, false, 2)
+func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+               v := s.newTestableVolume(c, params, -2*time.Second)
+               v.PrefixLength = 3
+               return v
+       })
+}
+
+func (s *stubbedS3Suite) TestIndex(c *check.C) {
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 0)
        v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
        v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
-               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+               err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+               c.Assert(err, check.IsNil)
        }
        for _, spec := range []struct {
                prefix      string
        }
        for _, spec := range []struct {
                prefix      string
@@ -79,7 +120,7 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
                {"abc", 0},
        } {
                buf := new(bytes.Buffer)
                {"abc", 0},
        } {
                buf := new(bytes.Buffer)
-               err := v.IndexTo(spec.prefix, buf)
+               err := v.Index(context.Background(), spec.prefix, buf)
                c.Check(err, check.IsNil)
 
                idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
                c.Check(err, check.IsNil)
 
                idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
@@ -88,8 +129,114 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
        }
 }
 
        }
 }
 
-func (s *StubbedS3Suite) TestStats(c *check.C) {
-       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
+func (s *stubbedS3Suite) TestSignature(c *check.C) {
+       var header http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               header = r.Header
+       }))
+       defer stub.Close()
+
+       // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+       // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+       vol := s3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       // Our test S3 server uses the older 'Path Style'
+       vol.usePathStyle = true
+       err := vol.check("")
+
+       c.Check(err, check.IsNil)
+       err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+       var reqHeader http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               reqHeader = r.Header
+       }))
+       defer stub.Close()
+
+       retrievedMetadata := false
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               retrievedMetadata = true
+               upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+               exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+               c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
+               switch {
+               case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
+                       io.WriteString(w, "testcredential\n")
+               case r.URL.Path == "/latest/api/token",
+                       r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
+                       // Literal example from
+                       // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+                       // but with updated timestamps
+                       io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer s.metadata.Close()
+
+       v := &s3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       Endpoint: stub.URL,
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := v.check(s.metadata.URL + "/latest")
+       c.Check(err, check.IsNil)
+       resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
+       c.Check(err, check.IsNil)
+       c.Check(resp.Buckets, check.HasLen, 0)
+       c.Check(retrievedMetadata, check.Equals, true)
+       c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
+
+       retrievedMetadata = false
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               retrievedMetadata = true
+               c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
+               w.WriteHeader(http.StatusNotFound)
+       }))
+       deadv := &s3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       Endpoint: "http://localhost:9",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err = deadv.check(s.metadata.URL + "/latest")
+       c.Check(err, check.IsNil)
+       _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
+       c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
+       c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+       c.Check(retrievedMetadata, check.Equals, true)
+}
+
+func (s *stubbedS3Suite) TestStats(c *check.C) {
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
        stats := func() string {
                buf, err := json.Marshal(v.InternalStats())
                c.Check(err, check.IsNil)
        stats := func() string {
                buf, err := json.Marshal(v.InternalStats())
                c.Check(err, check.IsNil)
@@ -99,30 +246,35 @@ func (s *StubbedS3Suite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
        c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := v.Get(context.Background(), loc, make([]byte, 3))
+       err := v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
-       c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
        c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
 
        c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
 
-       err = v.Put(context.Background(), loc, []byte("foo"))
+       err = v.BlockWrite(context.Background(), loc, []byte("foo"))
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
 
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
 
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       err = v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.IsNil)
        c.Check(err, check.IsNil)
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       err = v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
-type blockingHandler struct {
+type s3AWSBlockingHandler struct {
        requested chan *http.Request
        unblock   chan struct{}
 }
 
        requested chan *http.Request
        unblock   chan struct{}
 }
 
-func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
+               // Accept PutBucket ("PUT /bucketname/"), called by
+               // newTestableVolume
+               return
+       }
        if h.requested != nil {
                h.requested <- r
        }
        if h.requested != nil {
                h.requested <- r
        }
@@ -132,44 +284,29 @@ func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        http.Error(w, "nothing here", http.StatusNotFound)
 }
 
        http.Error(w, "nothing here", http.StatusNotFound)
 }
 
-func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := make([]byte, 3)
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               _, err := v.Get(ctx, loc, buf)
-               return err
-       })
-}
-
-func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("bar")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               return v.Compare(ctx, loc, buf)
+func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
+       s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+               return v.BlockRead(ctx, fooHash, brdiscard)
        })
 }
 
        })
 }
 
-func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("foo")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               return v.Put(ctx, loc, buf)
+func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
+       s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+               return v.BlockWrite(ctx, fooHash, []byte("foo"))
        })
 }
 
        })
 }
 
-func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
-       handler := &blockingHandler{}
-       srv := httptest.NewServer(handler)
-       defer srv.Close()
+func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
+       handler := &s3AWSBlockingHandler{}
+       s.s3server = httptest.NewServer(handler)
+       defer s.s3server.Close()
 
 
-       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
-       vol := *v.S3Volume
-       vol.Endpoint = srv.URL
-       v = &TestableS3Volume{S3Volume: &vol}
-       v.Start()
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
 
        ctx, cancel := context.WithCancel(context.Background())
 
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -205,23 +342,38 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
        }
 }
 
        }
 }
 
-func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
-       defer func(tl, bs arvados.Duration) {
-               theConfig.TrashLifetime = tl
-               theConfig.BlobSignatureTTL = bs
-       }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
-       theConfig.TrashLifetime.Set("1h")
-       theConfig.BlobSignatureTTL.Set("1h")
-
-       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
+func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
+       s.cluster.Collections.BlobTrashLifetime.Set("1h")
+       s.cluster.Collections.BlobSigningTTL.Set("1h")
+
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               Logger:       ctxlog.TestLogger(c),
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
        var none time.Time
 
        putS3Obj := func(t time.Time, key string, data []byte) {
                if t == none {
                        return
                }
        var none time.Time
 
        putS3Obj := func(t time.Time, key string, data []byte) {
                if t == none {
                        return
                }
-               v.serverClock.now = &t
-               v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+               s.s3fakeClock.now = &t
+               uploader := manager.NewUploader(v.bucket.svc)
+               _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
+                       Bucket: aws.String(v.bucket.bucket),
+                       Key:    aws.String(key),
+                       Body:   bytes.NewReader(data),
+               })
+               if err != nil {
+                       panic(err)
+               }
+               s.s3fakeClock.now = nil
+               _, err = v.head(key)
+               if err != nil {
+                       panic(err)
+               }
        }
 
        t0 := time.Now()
        }
 
        t0 := time.Now()
@@ -308,12 +460,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, false, false,
                },
                {
                        false, false, false, true, false, false,
                },
                {
-                       "Erroneously trashed during a race, detected before TrashLifetime",
+                       "Erroneously trashed during a race, detected before BlobTrashLifetime",
                        none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
                        true, false, true, true, true, false,
                },
                {
                        none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
                        true, false, true, true, true, false,
                },
                {
-                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
+                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
                        none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
                        true, false, true, true, true, false,
                },
                        none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
                        true, false, true, true, true, false,
                },
@@ -323,157 +475,224 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, true, true,
                },
        } {
                        false, false, false, true, true, true,
                },
        } {
-               c.Log("Scenario: ", scenario.label)
-
-               // We have a few tests to run for each scenario, and
-               // the tests are expected to change state. By calling
-               // this setup func between tests, we (re)create the
-               // scenario as specified, using a new unique block
-               // locator to prevent interference from previous
-               // tests.
-
-               setupScenario := func() (string, []byte) {
-                       nextKey++
-                       blk := []byte(fmt.Sprintf("%d", nextKey))
-                       loc := fmt.Sprintf("%x", md5.Sum(blk))
-                       c.Log("\t", loc)
-                       putS3Obj(scenario.dataT, loc, blk)
-                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
-                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
-                       v.serverClock.now = &t0
-                       return loc, blk
-               }
-
-               // Check canGet
-               loc, blk := setupScenario()
-               buf := make([]byte, len(blk))
-               _, err := v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGet)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Trash, then check canTrash and canGetAfterTrash
-               loc, blk = setupScenario()
-               err = v.Trash(loc)
-               c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Untrash, then check canUntrash
-               loc, blk = setupScenario()
-               err = v.Untrash(loc)
-               c.Check(err == nil, check.Equals, scenario.canUntrash)
-               if scenario.dataT != none || scenario.trashT != none {
-                       // In all scenarios where the data exists, we
-                       // should be able to Get after Untrash --
-                       // regardless of timestamps, errors, race
-                       // conditions, etc.
-                       _, err = v.Get(context.Background(), loc, buf)
+               for _, prefixLength := range []int{0, 3} {
+                       v.PrefixLength = prefixLength
+                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+                       // We have a few tests to run for each scenario, and
+                       // the tests are expected to change state. By calling
+                       // this setup func between tests, we (re)create the
+                       // scenario as specified, using a new unique block
+                       // locator to prevent interference from previous
+                       // tests.
+
+                       setupScenario := func() (string, []byte) {
+                               nextKey++
+                               blk := []byte(fmt.Sprintf("%d", nextKey))
+                               loc := fmt.Sprintf("%x", md5.Sum(blk))
+                               key := loc
+                               if prefixLength > 0 {
+                                       key = loc[:prefixLength] + "/" + loc
+                               }
+                               c.Log("\t", loc, "\t", key)
+                               putS3Obj(scenario.dataT, key, blk)
+                               putS3Obj(scenario.recentT, "recent/"+key, nil)
+                               putS3Obj(scenario.trashT, "trash/"+key, blk)
+                               v.s3fakeClock.now = &t0
+                               return loc, blk
+                       }
+
+                       // Check canGet
+                       loc, blk := setupScenario()
+                       err := v.BlockRead(context.Background(), loc, brdiscard)
+                       c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Trash, then check canTrash and canGetAfterTrash
+                       loc, _ = setupScenario()
+                       err = v.BlockTrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canTrash)
+                       err = v.BlockRead(context.Background(), loc, brdiscard)
+                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Untrash, then check canUntrash
+                       loc, _ = setupScenario()
+                       err = v.BlockUntrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canUntrash)
+                       if scenario.dataT != none || scenario.trashT != none {
+                               // In all scenarios where the data exists, we
+                               // should be able to Get after Untrash --
+                               // regardless of timestamps, errors, race
+                               // conditions, etc.
+                               err = v.BlockRead(context.Background(), loc, brdiscard)
+                               c.Check(err, check.IsNil)
+                       }
+
+                       // Call EmptyTrash, then check haveTrashAfterEmpty and
+                       // freshAfterEmpty
+                       loc, _ = setupScenario()
+                       v.EmptyTrash()
+                       _, err = v.head("trash/" + v.key(loc))
+                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+                       if scenario.freshAfterEmpty {
+                               t, err := v.Mtime(loc)
+                               c.Check(err, check.IsNil)
+                               // new mtime must be current (with an
+                               // allowance for 1s timestamp precision)
+                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+                       }
+
+                       // Check for current Mtime after Put (applies to all
+                       // scenarios)
+                       loc, blk = setupScenario()
+                       err = v.BlockWrite(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
                        c.Check(err, check.IsNil)
-               }
-
-               // Call EmptyTrash, then check haveTrashAfterEmpty and
-               // freshAfterEmpty
-               loc, blk = setupScenario()
-               v.EmptyTrash()
-               _, err = v.bucket.Head("trash/"+loc, nil)
-               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-               if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
-                       // new mtime must be current (with an
-                       // allowance for 1s timestamp precision)
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
-
-               // Check for current Mtime after Put (applies to all
-               // scenarios)
-               loc, blk = setupScenario()
-               err = v.Put(context.Background(), loc, blk)
-               c.Check(err, check.IsNil)
-               t, err := v.Mtime(loc)
-               c.Check(err, check.IsNil)
-               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
        }
 }
 
        }
 }
 
-type TestableS3Volume struct {
-       *S3Volume
-       server      *s3test.Server
+type testableS3Volume struct {
+       *s3Volume
+       server      *httptest.Server
        c           *check.C
        c           *check.C
-       serverClock *fakeClock
+       s3fakeClock *s3fakeClock
 }
 
 }
 
-func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
-       clock := &fakeClock{}
-       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
-       c.Assert(err, check.IsNil)
+type gofakes3logger struct {
+       logrus.FieldLogger
+}
+
+func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
+       switch level {
+       case gofakes3.LogErr:
+               l.Errorln(v...)
+       case gofakes3.LogWarn:
+               l.Warnln(v...)
+       case gofakes3.LogInfo:
+               l.Infoln(v...)
+       default:
+               panic("unknown level")
+       }
+}
+
+var testBucketSerial atomic.Int64
 
 
-       v := &TestableS3Volume{
-               S3Volume: &S3Volume{
-                       Bucket:             TestBucketName,
-                       Endpoint:           srv.URL(),
-                       Region:             "test-region-1",
-                       LocationConstraint: true,
-                       RaceWindow:         arvados.Duration(raceWindow),
-                       S3Replication:      replication,
-                       UnsafeDelete:       s3UnsafeDelete,
-                       ReadOnly:           readonly,
-                       IndexPageSize:      1000,
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+       if params.Logger == nil {
+               params.Logger = ctxlog.TestLogger(c)
+       }
+       if s.s3server == nil {
+               backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
+               logger := ctxlog.TestLogger(c)
+               faker := gofakes3.New(backend,
+                       gofakes3.WithTimeSource(s.s3fakeClock),
+                       gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
+                       gofakes3.WithTimeSkewLimit(0))
+               s.s3server = httptest.NewServer(faker.Server())
+       }
+       endpoint := s.s3server.URL
+       bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
+
+       var metadataURL, accessKey, secretKey string
+       if s.metadata != nil {
+               metadataURL = s.metadata.URL
+       } else {
+               accessKey, secretKey = "xxx", "xxx"
+       }
+
+       v := &testableS3Volume{
+               s3Volume: &s3Volume{
+                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                               AccessKeyID:        accessKey,
+                               SecretAccessKey:    secretKey,
+                               Bucket:             bucketName,
+                               Endpoint:           endpoint,
+                               Region:             "test-region-1",
+                               LocationConstraint: true,
+                               UnsafeDelete:       true,
+                               IndexPageSize:      1000,
+                       },
+                       cluster:      params.Cluster,
+                       volume:       params.ConfigVolume,
+                       logger:       params.Logger,
+                       metrics:      params.MetricsVecs,
+                       bufferPool:   params.BufferPool,
+                       usePathStyle: true,
                },
                c:           c,
                },
                c:           c,
-               server:      srv,
-               serverClock: clock,
+               s3fakeClock: s.s3fakeClock,
+       }
+       c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
+       // Create the testbucket
+       input := &s3.CreateBucketInput{
+               Bucket: aws.String(bucketName),
        }
        }
-       v.Start()
-       err = v.bucket.PutBucket(s3.ACL("private"))
+       _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
        c.Assert(err, check.IsNil)
        c.Assert(err, check.IsNil)
+       // We couldn't set RaceWindow until now because check()
+       // rejects negative values.
+       v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
        return v
 }
 
        return v
 }
 
-func (v *TestableS3Volume) Start() error {
-       tmp, err := ioutil.TempFile("", "keepstore")
-       v.c.Assert(err, check.IsNil)
-       defer os.Remove(tmp.Name())
-       _, err = tmp.Write([]byte("xxx\n"))
-       v.c.Assert(err, check.IsNil)
-       v.c.Assert(tmp.Close(), check.IsNil)
-
-       v.S3Volume.AccessKeyFile = tmp.Name()
-       v.S3Volume.SecretKeyFile = tmp.Name()
+func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
+       key := v.key(loc)
+       r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
 
 
-       v.c.Assert(v.S3Volume.Start(), check.IsNil)
-       return nil
-}
+       uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
+               u.PartSize = 5 * 1024 * 1024
+               u.Concurrency = 13
+       })
 
 
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
-       if err != nil {
-               log.Printf("PutRaw: %s: %+v", loc, err)
-       }
-       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(key),
+               Body:   r,
+       })
        if err != nil {
        if err != nil {
-               log.Printf("PutRaw: recent/%s: %+v", loc, err)
+               return err
        }
        }
+
+       empty := bytes.NewReader([]byte{})
+       _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + key),
+               Body:   empty,
+       })
+       return err
 }
 
 // TouchWithDate turns back the clock while doing a Touch(). We assume
 // there are no other operations happening on the same s3test server
 // while we do this.
 }
 
 // TouchWithDate turns back the clock while doing a Touch(). We assume
 // there are no other operations happening on the same s3test server
 // while we do this.
-func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
-       v.serverClock.now = &lastPut
-       err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
+       v.s3fakeClock.now = &lastPut
+
+       uploader := manager.NewUploader(v.bucket.svc)
+       empty := bytes.NewReader([]byte{})
+       _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + v.key(loc)),
+               Body:   empty,
+       })
        if err != nil {
                panic(err)
        }
        if err != nil {
                panic(err)
        }
-       v.serverClock.now = nil
+
+       v.s3fakeClock.now = nil
+}
+
+func (v *testableS3Volume) Teardown() {
 }
 
 }
 
-func (v *TestableS3Volume) Teardown() {
-       v.server.Quit()
+func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "put"
 }
 }