s3server *httptest.Server
metadata *httptest.Server
cluster *arvados.Cluster
- handler *handler
- volumes []*TestableS3AWSVolume
+ volumes []*testableS3Volume
}
func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
"zzzzz-nyw5e-000000000000000": {Driver: "S3"},
"zzzzz-nyw5e-111111111111111": {Driver: "S3"},
}
- s.handler = &handler{}
}
func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ return s.newTestableVolume(c, params, -2*time.Second)
})
}
func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
- DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
+ return s.newTestableVolume(c, params, -2*time.Second)
})
}
func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ v := s.newTestableVolume(c, params, -2*time.Second)
v.PrefixLength = 3
return v
})
}
func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 0)
v.IndexPageSize = 3
for i := 0; i < 256; i++ {
- v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ c.Assert(err, check.IsNil)
}
for _, spec := range []struct {
prefix string
{"abc", 0},
} {
buf := new(bytes.Buffer)
- err := v.IndexTo(spec.prefix, buf)
+ err := v.Index(context.Background(), spec.prefix, buf)
c.Check(err, check.IsNil)
idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
// The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
// as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
- vol := S3AWSVolume{
+ vol := s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
AccessKeyID: "xxx",
SecretAccessKey: "xxx",
vol.bucket.svc.ForcePathStyle = true
c.Check(err, check.IsNil)
- err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+ err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
c.Check(err, check.IsNil)
c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
}
}))
defer s.metadata.Close()
- v := &S3AWSVolume{
+ v := &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: s.metadata.URL + "/latest/api/token",
Endpoint: "http://localhost:12345",
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
- deadv := &S3AWSVolume{
+ deadv := &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: s.metadata.URL + "/fake-metadata/test-role",
Endpoint: "http://localhost:12345",
}
func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
stats := func() string {
buf, err := json.Marshal(v.InternalStats())
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := v.Get(context.Background(), loc, make([]byte, 3))
+ _, err := v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
- err = v.Put(context.Background(), loc, []byte("foo"))
+ err = v.BlockWrite(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
- _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.IsNil)
- _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
}
func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := make([]byte, 3)
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
- _, err := v.Get(ctx, loc, buf)
+ s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+ _, err := v.BlockRead(ctx, fooHash, io.Discard)
return err
})
}
-func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := []byte("bar")
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
- return v.Compare(ctx, loc, buf)
- })
-}
-
func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := []byte("foo")
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
- return v.Put(ctx, loc, buf)
+ s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
+ return v.BlockWrite(ctx, fooHash, []byte("foo"))
})
}
-func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
handler := &s3AWSBlockingHandler{}
s.s3server = httptest.NewServer(handler)
defer s.s3server.Close()
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
ctx, cancel := context.WithCancel(context.Background())
s.cluster.Collections.BlobTrashLifetime.Set("1h")
s.cluster.Collections.BlobSigningTTL.Set("1h")
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ Logger: ctxlog.TestLogger(c),
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
var none time.Time
putS3Obj := func(t time.Time, key string, data []byte) {
// Check canGet
loc, blk := setupScenario()
- buf := make([]byte, len(blk))
- _, err := v.Get(context.Background(), loc, buf)
+ _, err := v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// Call Trash, then check canTrash and canGetAfterTrash
loc, _ = setupScenario()
- err = v.Trash(loc)
+ err = v.BlockTrash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(context.Background(), loc, buf)
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// Call Untrash, then check canUntrash
loc, _ = setupScenario()
- err = v.Untrash(loc)
+ err = v.BlockUntrash(loc)
c.Check(err == nil, check.Equals, scenario.canUntrash)
if scenario.dataT != none || scenario.trashT != none {
// In all scenarios where the data exists, we
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.Get(context.Background(), loc, buf)
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.IsNil)
}
// Check for current Mtime after Put (applies to all
// scenarios)
loc, blk = setupScenario()
- err = v.Put(context.Background(), loc, blk)
+ err = v.BlockWrite(context.Background(), loc, blk)
c.Check(err, check.IsNil)
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
}
}
-type TestableS3AWSVolume struct {
- *S3AWSVolume
+type testableS3Volume struct {
+ *s3Volume
server *httptest.Server
c *check.C
serverClock *s3AWSFakeClock
}
}
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
clock := &s3AWSFakeClock{}
// fake s3
iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
}
- v := &TestableS3AWSVolume{
- S3AWSVolume: &S3AWSVolume{
+ v := &testableS3Volume{
+ s3Volume: &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: iamRole,
AccessKeyID: accessKey,
UnsafeDelete: true,
IndexPageSize: 1000,
},
- cluster: cluster,
- volume: volume,
- logger: ctxlog.TestLogger(c),
- metrics: metrics,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
},
c: c,
server: srv,
serverClock: clock,
}
- c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+ c.Assert(v.s3Volume.check(""), check.IsNil)
// Our test S3 server uses the older 'Path Style'
- v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+ v.s3Volume.bucket.svc.ForcePathStyle = true
// Create the testbucket
input := &s3.CreateBucketInput{
Bucket: aws.String(S3AWSTestBucketName),
}
- req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+ req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
_, err := req.Send(context.Background())
c.Assert(err, check.IsNil)
// We couldn't set RaceWindow until now because check()
// rejects negative values.
- v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+ v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
return v
}
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
key := v.key(loc)
- r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+ r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
u.PartSize = 5 * 1024 * 1024
Body: r,
})
if err != nil {
- v.logger.Printf("PutRaw: %s: %+v", key, err)
+ return err
}
empty := bytes.NewReader([]byte{})
Key: aws.String("recent/" + key),
Body: empty,
})
- if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
- }
+ return err
}
// TouchWithDate turns back the clock while doing a Touch(). We assume
// there are no other operations happening on the same s3test server
// while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
+func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
v.serverClock.now = &lastPut
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
v.serverClock.now = nil
}
-func (v *TestableS3AWSVolume) Teardown() {
+func (v *testableS3Volume) Teardown() {
v.server.Close()
}
-func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
return "get", "put"
}