s3server *httptest.Server
metadata *httptest.Server
cluster *arvados.Cluster
- handler *handler
volumes []*TestableS3AWSVolume
}
"zzzzz-nyw5e-000000000000000": {Driver: "S3"},
"zzzzz-nyw5e-111111111111111": {Driver: "S3"},
}
- s.handler = &handler{}
}
func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ return s.newTestableVolume(c, params, -2*time.Second)
})
}
func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
- DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
+ return s.newTestableVolume(c, params, -2*time.Second)
})
}
func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ v := s.newTestableVolume(c, params, -2*time.Second)
v.PrefixLength = 3
return v
})
}
func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 0)
v.IndexPageSize = 3
for i := 0; i < 256; i++ {
v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
{"abc", 0},
} {
buf := new(bytes.Buffer)
- err := v.IndexTo(spec.prefix, buf)
+ err := v.Index(context.Background(), spec.prefix, buf)
c.Check(err, check.IsNil)
idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
vol.bucket.svc.ForcePathStyle = true
c.Check(err, check.IsNil)
- err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+ err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
c.Check(err, check.IsNil)
c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
}
}
func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
stats := func() string {
buf, err := json.Marshal(v.InternalStats())
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := v.Get(context.Background(), loc, make([]byte, 3))
+ _, err := v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
- err = v.Put(context.Background(), loc, []byte("foo"))
+ err = v.BlockWrite(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
- _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.IsNil)
- _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
}
func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := make([]byte, 3)
-
s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
- _, err := v.Get(ctx, loc, buf)
+ _, err := v.BlockRead(ctx, fooHash, io.Discard)
return err
})
}
-func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := []byte("bar")
-
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
- return v.Compare(ctx, loc, buf)
- })
-}
-
func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
- loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- buf := []byte("foo")
-
s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
- return v.Put(ctx, loc, buf)
+ return v.BlockWrite(ctx, fooHash, []byte("foo"))
})
}
s.s3server = httptest.NewServer(handler)
defer s.s3server.Close()
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
ctx, cancel := context.WithCancel(context.Background())
s.cluster.Collections.BlobTrashLifetime.Set("1h")
s.cluster.Collections.BlobSigningTTL.Set("1h")
- v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ v := s.newTestableVolume(c, newVolumeParams{
+ Cluster: s.cluster,
+ ConfigVolume: arvados.Volume{Replication: 2},
+ Logger: ctxlog.TestLogger(c),
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ }, 5*time.Minute)
var none time.Time
putS3Obj := func(t time.Time, key string, data []byte) {
// Check canGet
loc, blk := setupScenario()
- buf := make([]byte, len(blk))
- _, err := v.Get(context.Background(), loc, buf)
+ _, err := v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// Call Trash, then check canTrash and canGetAfterTrash
loc, _ = setupScenario()
- err = v.Trash(loc)
+ err = v.BlockTrash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(context.Background(), loc, buf)
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// Call Untrash, then check canUntrash
loc, _ = setupScenario()
- err = v.Untrash(loc)
+ err = v.BlockUntrash(loc)
c.Check(err == nil, check.Equals, scenario.canUntrash)
if scenario.dataT != none || scenario.trashT != none {
// In all scenarios where the data exists, we
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.Get(context.Background(), loc, buf)
+ _, err = v.BlockRead(context.Background(), loc, io.Discard)
c.Check(err, check.IsNil)
}
// Check for current Mtime after Put (applies to all
// scenarios)
loc, blk = setupScenario()
- err = v.Put(context.Background(), loc, blk)
+ err = v.BlockWrite(context.Background(), loc, blk)
c.Check(err, check.IsNil)
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
}
}
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *TestableS3AWSVolume {
clock := &s3AWSFakeClock{}
// fake s3
UnsafeDelete: true,
IndexPageSize: 1000,
},
- cluster: cluster,
- volume: volume,
- logger: ctxlog.TestLogger(c),
- metrics: metrics,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
},
c: c,
server: srv,
// PutRaw skips the ContentMD5 test
func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
key := v.key(loc)
- r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+ r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
u.PartSize = 5 * 1024 * 1024