//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bytes"
"net/http/httptest"
"os"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
- "github.com/AdRoll/goamz/s3"
- "github.com/AdRoll/goamz/s3/s3test"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+
+ "github.com/johannesboyne/gofakes3"
+ "github.com/johannesboyne/gofakes3/backend/s3mem"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
-const (
- TestBucketName = "testbucket"
-)
-
-type fakeClock struct {
+type s3fakeClock struct {
now *time.Time
}
-func (c *fakeClock) Now() time.Time {
+func (c *s3fakeClock) Now() time.Time {
if c.now == nil {
- return time.Now()
+ return time.Now().UTC()
}
- return *c.now
+ return c.now.UTC()
+}
+
+func (c *s3fakeClock) Since(t time.Time) time.Duration {
+ return c.Now().Sub(t)
}
-var _ = check.Suite(&StubbedS3Suite{})
+var _ = check.Suite(&stubbedS3Suite{})
+
+var srv httptest.Server
-type StubbedS3Suite struct {
- s3server *httptest.Server
- metadata *httptest.Server
- cluster *arvados.Cluster
- handler *handler
- volumes []*TestableS3Volume
+type stubbedS3Suite struct {
+ s3server *httptest.Server
+ s3fakeClock *s3fakeClock
+ metadata *httptest.Server
+ cluster *arvados.Cluster
+ volumes []*testableS3Volume
}
-func (s *StubbedS3Suite) SetUpTest(c *check.C) {
+func (s *stubbedS3Suite) SetUpTest(c *check.C) {
s.s3server = nil
+ s.s3fakeClock = &s3fakeClock{}
s.metadata = nil
s.cluster = testCluster(c)
s.cluster.Volumes = map[string]arvados.Volume{
"zzzzz-nyw5e-000000000000000": {Driver: "S3"},
"zzzzz-nyw5e-111111111111111": {Driver: "S3"},
}
- s.handler = &handler{}
}
-func (s *StubbedS3Suite) TestGeneric(c *check.C) {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+func (s *stubbedS3Suite) TearDownTest(c *check.C) {
+ if s.s3server != nil {
+ s.s3server.Close()
+ }
+}
+
+func (s *stubbedS3Suite) TestGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ return s.newTestableVolume(c, params, -2*time.Second)
})
}
-func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
- DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
+ DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
+ return s.newTestableVolume(c, params, -2*time.Second)
})
}
-func (s *StubbedS3Suite) TestIndex(c *check.C) {
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ v := s.newTestableVolume(c, params, -2*time.Second)
+ v.PrefixLength = 3
+ return v
+ })
+}
+
+func (s *stubbedS3Suite) TestIndex(c *check.C) {
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 0)
v.IndexPageSize = 3
for i := 0; i < 256; i++ {
- v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ c.Assert(err, check.IsNil)
}
for _, spec := range []struct {
prefix string
{"abc", 0},
} {
buf := new(bytes.Buffer)
- err := v.IndexTo(spec.prefix, buf)
+ err := v.Index(context.Background(), spec.prefix, buf)
c.Check(err, check.IsNil)
idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
}
}
-func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+func (s *stubbedS3Suite) TestSignature(c *check.C) {
+ var header http.Header
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ header = r.Header
+ }))
+ defer stub.Close()
+
+ // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+ // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+ vol := s3Volume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ AccessKeyID: "xxx",
+ SecretAccessKey: "xxx",
+ Endpoint: stub.URL,
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ // Our test S3 server uses the older 'Path Style'
+ vol.usePathStyle = true
+ err := vol.check("")
+
+ c.Check(err, check.IsNil)
+ err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+ var reqHeader http.Header
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ reqHeader = r.Header
+ }))
+ defer stub.Close()
+
+ retrievedMetadata := false
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ retrievedMetadata = true
upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
- // Literal example from
- // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
- // but with updated timestamps
- io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
+ switch {
+ case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
+ io.WriteString(w, "testcredential\n")
+ case r.URL.Path == "/latest/api/token",
+ r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
+ // Literal example from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+ // but with updated timestamps
+ io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
}))
defer s.metadata.Close()
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
- c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
- c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
- c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
- c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ v := &s3Volume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ Endpoint: stub.URL,
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err := v.check(s.metadata.URL + "/latest")
+ c.Check(err, check.IsNil)
+ resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
+ c.Check(err, check.IsNil)
+ c.Check(resp.Buckets, check.HasLen, 0)
+ c.Check(retrievedMetadata, check.Equals, true)
+ c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
+ retrievedMetadata = false
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ retrievedMetadata = true
+ c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
}))
- deadv := &S3Volume{
- IAMRole: s.metadata.URL + "/fake-metadata/test-role",
- Endpoint: "http://localhost:12345",
- Region: "test-region-1",
- Bucket: "test-bucket-name",
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ deadv := &s3Volume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ Endpoint: "http://localhost:9",
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
}
- err := deadv.check()
- c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
- c.Check(err, check.ErrorMatches, `.*404.*`)
+ err = deadv.check(s.metadata.URL + "/latest")
+ c.Check(err, check.IsNil)
+ _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
+ c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
+ c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+ c.Check(retrievedMetadata, check.Equals, true)
}
-func (s *StubbedS3Suite) TestStats(c *check.C) {
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+func (s *stubbedS3Suite) TestStats(c *check.C) {
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
stats := func() string {
buf, err := json.Marshal(v.InternalStats())
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := v.Get(context.Background(), loc, make([]byte, 3))
+ err := v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
- c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
- err = v.Put(context.Background(), loc, []byte("foo"))
+ err = v.BlockWrite(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
- _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
-type blockingHandler struct {
+type s3AWSBlockingHandler struct {
requested chan *http.Request
unblock chan struct{}
}
-func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
// Accept PutBucket ("PUT /bucketname/"), called by
// newTestableVolume
http.Error(w, "nothing here", http.StatusNotFound)
}
-func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := make([]byte, 3)
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
- _, err := v.Get(ctx, loc, buf)
- return err
- })
-}
-
-func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := []byte("bar")
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
- return v.Compare(ctx, loc, buf)
+func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
+ s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+ return v.BlockRead(ctx, fooHash, brdiscard)
})
}
-func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := []byte("foo")
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
- return v.Put(ctx, loc, buf)
+func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
+ s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+ return v.BlockWrite(ctx, fooHash, []byte("foo"))
})
}
-func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
- handler := &blockingHandler{}
+func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
+ handler := &s3AWSBlockingHandler{}
s.s3server = httptest.NewServer(handler)
defer s.s3server.Close()
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
ctx, cancel := context.WithCancel(context.Background())
}
}
-func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
+func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
s.cluster.Collections.BlobTrashLifetime.Set("1h")
s.cluster.Collections.BlobSigningTTL.Set("1h")
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ Logger: ctxlog.TestLogger(c),
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
var none time.Time
putS3Obj := func(t time.Time, key string, data []byte) {
if t == none {
return
}
- v.serverClock.now = &t
- v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ s.s3fakeClock.now = &t
+ uploader := manager.NewUploader(v.bucket.svc)
+ _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(key),
+ Body: bytes.NewReader(data),
+ })
+ if err != nil {
+ panic(err)
+ }
+ s.s3fakeClock.now = nil
+ _, err = v.head(key)
+ if err != nil {
+ panic(err)
+ }
}
t0 := time.Now()
false, false, false, true, true, true,
},
} {
- c.Log("Scenario: ", scenario.label)
-
- // We have a few tests to run for each scenario, and
- // the tests are expected to change state. By calling
- // this setup func between tests, we (re)create the
- // scenario as specified, using a new unique block
- // locator to prevent interference from previous
- // tests.
-
- setupScenario := func() (string, []byte) {
- nextKey++
- blk := []byte(fmt.Sprintf("%d", nextKey))
- loc := fmt.Sprintf("%x", md5.Sum(blk))
- c.Log("\t", loc)
- putS3Obj(scenario.dataT, loc, blk)
- putS3Obj(scenario.recentT, "recent/"+loc, nil)
- putS3Obj(scenario.trashT, "trash/"+loc, blk)
- v.serverClock.now = &t0
- return loc, blk
- }
-
- // Check canGet
- loc, blk := setupScenario()
- buf := make([]byte, len(blk))
- _, err := v.Get(context.Background(), loc, buf)
- c.Check(err == nil, check.Equals, scenario.canGet)
- if err != nil {
- c.Check(os.IsNotExist(err), check.Equals, true)
- }
-
- // Call Trash, then check canTrash and canGetAfterTrash
- loc, _ = setupScenario()
- err = v.Trash(loc)
- c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(context.Background(), loc, buf)
- c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
- if err != nil {
- c.Check(os.IsNotExist(err), check.Equals, true)
- }
-
- // Call Untrash, then check canUntrash
- loc, _ = setupScenario()
- err = v.Untrash(loc)
- c.Check(err == nil, check.Equals, scenario.canUntrash)
- if scenario.dataT != none || scenario.trashT != none {
- // In all scenarios where the data exists, we
- // should be able to Get after Untrash --
- // regardless of timestamps, errors, race
- // conditions, etc.
- _, err = v.Get(context.Background(), loc, buf)
+ for _, prefixLength := range []int{0, 3} {
+ v.PrefixLength = prefixLength
+ c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+ // We have a few tests to run for each scenario, and
+ // the tests are expected to change state. By calling
+ // this setup func between tests, we (re)create the
+ // scenario as specified, using a new unique block
+ // locator to prevent interference from previous
+ // tests.
+
+ setupScenario := func() (string, []byte) {
+ nextKey++
+ blk := []byte(fmt.Sprintf("%d", nextKey))
+ loc := fmt.Sprintf("%x", md5.Sum(blk))
+ key := loc
+ if prefixLength > 0 {
+ key = loc[:prefixLength] + "/" + loc
+ }
+ c.Log("\t", loc, "\t", key)
+ putS3Obj(scenario.dataT, key, blk)
+ putS3Obj(scenario.recentT, "recent/"+key, nil)
+ putS3Obj(scenario.trashT, "trash/"+key, blk)
+ v.s3fakeClock.now = &t0
+ return loc, blk
+ }
+
+ // Check canGet
+ loc, blk := setupScenario()
+ err := v.BlockRead(context.Background(), loc, brdiscard)
+ c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Trash, then check canTrash and canGetAfterTrash
+ loc, _ = setupScenario()
+ err = v.BlockTrash(loc)
+ c.Check(err == nil, check.Equals, scenario.canTrash)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
+ c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Untrash, then check canUntrash
+ loc, _ = setupScenario()
+ err = v.BlockUntrash(loc)
+ c.Check(err == nil, check.Equals, scenario.canUntrash)
+ if scenario.dataT != none || scenario.trashT != none {
+ // In all scenarios where the data exists, we
+ // should be able to Get after Untrash --
+ // regardless of timestamps, errors, race
+ // conditions, etc.
+ err = v.BlockRead(context.Background(), loc, brdiscard)
+ c.Check(err, check.IsNil)
+ }
+
+ // Call EmptyTrash, then check haveTrashAfterEmpty and
+ // freshAfterEmpty
+ loc, _ = setupScenario()
+ v.EmptyTrash()
+ _, err = v.head("trash/" + v.key(loc))
+ c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+ if scenario.freshAfterEmpty {
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ // new mtime must be current (with an
+ // allowance for 1s timestamp precision)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+
+ // Check for current Mtime after Put (applies to all
+ // scenarios)
+ loc, blk = setupScenario()
+ err = v.BlockWrite(context.Background(), loc, blk)
c.Check(err, check.IsNil)
- }
-
- // Call EmptyTrash, then check haveTrashAfterEmpty and
- // freshAfterEmpty
- loc, _ = setupScenario()
- v.EmptyTrash()
- _, err = v.bucket.Head("trash/"+loc, nil)
- c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
- if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
- // new mtime must be current (with an
- // allowance for 1s timestamp precision)
c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
-
- // Check for current Mtime after Put (applies to all
- // scenarios)
- loc, blk = setupScenario()
- err = v.Put(context.Background(), loc, blk)
- c.Check(err, check.IsNil)
- t, err := v.Mtime(loc)
- c.Check(err, check.IsNil)
- c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
}
-type TestableS3Volume struct {
- *S3Volume
- server *s3test.Server
+type testableS3Volume struct {
+ *s3Volume
+ server *httptest.Server
c *check.C
- serverClock *fakeClock
+ s3fakeClock *s3fakeClock
}
-func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
- clock := &fakeClock{}
- srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
- c.Assert(err, check.IsNil)
- endpoint := srv.URL()
- if s.s3server != nil {
- endpoint = s.s3server.URL
+type gofakes3logger struct {
+ logrus.FieldLogger
+}
+
+func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
+ switch level {
+ case gofakes3.LogErr:
+ l.Errorln(v...)
+ case gofakes3.LogWarn:
+ l.Warnln(v...)
+ case gofakes3.LogInfo:
+ l.Infoln(v...)
+ default:
+ panic("unknown level")
+ }
+}
+
+var testBucketSerial atomic.Int64
+
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+ if params.Logger == nil {
+ params.Logger = ctxlog.TestLogger(c)
+ }
+ if s.s3server == nil {
+ backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
+ logger := ctxlog.TestLogger(c)
+ faker := gofakes3.New(backend,
+ gofakes3.WithTimeSource(s.s3fakeClock),
+ gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
+ gofakes3.WithTimeSkewLimit(0))
+ s.s3server = httptest.NewServer(faker.Server())
}
+ endpoint := s.s3server.URL
+ bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
- iamRole, accessKey, secretKey := "", "xxx", "xxx"
+ var metadataURL, accessKey, secretKey string
if s.metadata != nil {
- iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+ metadataURL = s.metadata.URL
+ } else {
+ accessKey, secretKey = "xxx", "xxx"
}
- v := &TestableS3Volume{
- S3Volume: &S3Volume{
- AccessKey: accessKey,
- SecretKey: secretKey,
- IAMRole: iamRole,
- Bucket: TestBucketName,
- Endpoint: endpoint,
- Region: "test-region-1",
- LocationConstraint: true,
- UnsafeDelete: true,
- IndexPageSize: 1000,
- cluster: cluster,
- volume: volume,
- logger: ctxlog.TestLogger(c),
- metrics: metrics,
+ v := &testableS3Volume{
+ s3Volume: &s3Volume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ AccessKeyID: accessKey,
+ SecretAccessKey: secretKey,
+ Bucket: bucketName,
+ Endpoint: endpoint,
+ Region: "test-region-1",
+ LocationConstraint: true,
+ UnsafeDelete: true,
+ IndexPageSize: 1000,
+ },
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
+ usePathStyle: true,
},
c: c,
- server: srv,
- serverClock: clock,
+ s3fakeClock: s.s3fakeClock,
+ }
+ c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
+ // Create the testbucket
+ input := &s3.CreateBucketInput{
+ Bucket: aws.String(bucketName),
}
- c.Assert(v.S3Volume.check(), check.IsNil)
- c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
+ _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
+ c.Assert(err, check.IsNil)
// We couldn't set RaceWindow until now because check()
// rejects negative values.
- v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
+ v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
return v
}
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
- err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
- if err != nil {
- v.logger.Printf("PutRaw: %s: %+v", loc, err)
- }
- err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
+ key := v.key(loc)
+ r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+
+ uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
+ u.PartSize = 5 * 1024 * 1024
+ u.Concurrency = 13
+ })
+
+ _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(key),
+ Body: r,
+ })
if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ return err
}
+
+ empty := bytes.NewReader([]byte{})
+ _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + key),
+ Body: empty,
+ })
+ return err
}
// TouchWithDate turns back the clock while doing a Touch(). We assume
// there are no other operations happening on the same s3test server
// while we do this.
-func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
- v.serverClock.now = &lastPut
- err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
+ v.s3fakeClock.now = &lastPut
+
+ uploader := manager.NewUploader(v.bucket.svc)
+ empty := bytes.NewReader([]byte{})
+ _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + v.key(loc)),
+ Body: empty,
+ })
if err != nil {
panic(err)
}
- v.serverClock.now = nil
+
+ v.s3fakeClock.now = nil
}
-func (v *TestableS3Volume) Teardown() {
- v.server.Quit()
+func (v *testableS3Volume) Teardown() {
}
-func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
return "get", "put"
}