//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bytes"
blob.Mtime = t
}
-func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
h.Lock()
defer h.Unlock()
h.blobs[container+"|"+hash] = &azBlob{
rw.WriteHeader(http.StatusCreated)
case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
// "Set Metadata Headers" API. We don't bother
- // stubbing "Get Metadata Headers": AzureBlobVolume
+ // stubbing "Get Metadata Headers": azureBlobVolume
// sets metadata headers only as a way to bump Etag
// and Last-Modified.
if !blobExists {
return d.Dialer.Dial(network, address)
}
-type TestableAzureBlobVolume struct {
- *AzureBlobVolume
+type testableAzureBlobVolume struct {
+ *azureBlobVolume
azHandler *azStubHandler
azStub *httptest.Server
t TB
}
-func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume {
+func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
azHandler := newAzStubHandler(t.(*check.C))
azStub := httptest.NewServer(azHandler)
azClient.Sender = &singleSender{}
bs := azClient.GetBlobService()
- v := &AzureBlobVolume{
+ v := &azureBlobVolume{
ContainerName: container,
WriteRaceInterval: arvados.Duration(time.Millisecond),
WriteRacePollTime: arvados.Duration(time.Nanosecond),
ListBlobsRetryDelay: arvados.Duration(time.Millisecond),
azClient: azClient,
container: &azureContainer{ctr: bs.GetContainerReference(container)},
- cluster: cluster,
- volume: volume,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
logger: ctxlog.TestLogger(t),
- metrics: metrics,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
}
if err = v.check(); err != nil {
t.Fatal(err)
}
- return &TestableAzureBlobVolume{
- AzureBlobVolume: v,
+ return &testableAzureBlobVolume{
+ azureBlobVolume: v,
azHandler: azHandler,
azStub: azStub,
t: t,
}
}
-var _ = check.Suite(&StubbedAzureBlobSuite{})
+var _ = check.Suite(&stubbedAzureBlobSuite{})
-type StubbedAzureBlobSuite struct {
+type stubbedAzureBlobSuite struct {
origHTTPTransport http.RoundTripper
}
-func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
s.origHTTPTransport = http.DefaultTransport
http.DefaultTransport = &http.Transport{
Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
}
}
-func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
+func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
http.DefaultTransport = s.origHTTPTransport
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- return s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ return s.newTestableAzureBlobVolume(t, params)
})
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
// Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
- for _, b := range []int{2 << 22, 2<<22 - 1} {
- DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+ for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
+ c.Logf("=== MaxGetBytes=%d", b)
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ v := s.newTestableAzureBlobVolume(t, params)
v.MaxGetBytes = b
return v
})
}
}
-func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
- DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
- return s.newTestableAzureBlobVolume(c, cluster, volume, metrics)
+func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
+ return s.newTestableAzureBlobVolume(c, params)
})
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
- v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
defer v.Teardown()
for _, size := range []int{
data[i] = byte((i + 7) & 0xff)
}
hash := fmt.Sprintf("%x", md5.Sum(data))
- err := v.Put(context.Background(), hash, data)
+ err := v.BlockWrite(context.Background(), hash, data)
if err != nil {
c.Error(err)
}
- gotData := make([]byte, len(data))
- gotLen, err := v.Get(context.Background(), hash, gotData)
+ gotData := &brbuffer{}
+ err = v.BlockRead(context.Background(), hash, gotData)
if err != nil {
c.Error(err)
}
- gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
- if gotLen != size {
- c.Errorf("length mismatch: got %d != %d", gotLen, size)
- }
+ gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
+ c.Check(gotData.Len(), check.Equals, size)
if gotHash != hash {
c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
}
}
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
- v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
defer v.Teardown()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
- err := v.Put(context.Background(), TestHash, TestBlock)
+ err := v.BlockWrite(context.Background(), TestHash, TestBlock)
if err != nil {
c.Error(err)
}
}()
- continuePut := make(chan struct{})
- // Wait for the stub's Put to create the empty blob
- v.azHandler.race <- continuePut
+ continueBlockWrite := make(chan struct{})
+ // Wait for the stub's BlockWrite to create the empty blob
+ v.azHandler.race <- continueBlockWrite
wg.Add(1)
go func() {
defer wg.Done()
- buf := make([]byte, len(TestBlock))
- _, err := v.Get(context.Background(), TestHash, buf)
+ err := v.BlockRead(context.Background(), TestHash, brdiscard)
if err != nil {
c.Error(err)
}
}()
- // Wait for the stub's Get to get the empty blob
+ // Wait for the stub's BlockRead to get the empty blob
close(v.azHandler.race)
- // Allow stub's Put to continue, so the real data is ready
- // when the volume's Get retries
- <-continuePut
- // Wait for Get() and Put() to finish
+ // Allow stub's BlockWrite to continue, so the real data is ready
+ // when the volume's BlockRead retries
+ <-continueBlockWrite
+ // Wait for BlockRead() and BlockWrite() to finish
wg.Wait()
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
- v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
- v.AzureBlobVolume.WriteRaceInterval.Set("2s")
- v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
+ v.azureBlobVolume.WriteRaceInterval.Set("2s")
+ v.azureBlobVolume.WriteRacePollTime.Set("5ms")
defer v.Teardown()
- v.PutRaw(TestHash, nil)
+ v.BlockWriteRaw(TestHash, nil)
buf := new(bytes.Buffer)
- v.IndexTo("", buf)
+ v.Index(context.Background(), "", buf)
if buf.Len() != 0 {
c.Errorf("Index %+q should be empty", buf.Bytes())
}
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf := make([]byte, BlockSize)
- n, err := v.Get(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
c.Error(err)
return
}
- if n != 0 {
- c.Errorf("Got %+q, expected empty buf", buf[:n])
- }
+ c.Check(buf.String(), check.Equals, "")
}()
select {
case <-allDone:
case <-time.After(time.Second):
- c.Error("Get should have stopped waiting for race when block was 2s old")
+ c.Error("BlockRead should have stopped waiting for race when block was 2s old")
}
buf.Reset()
- v.IndexTo("", buf)
+ v.Index(context.Background(), "", buf)
if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
}
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelGet(c *check.C) {
- s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
- v.PutRaw(TestHash, TestBlock)
- _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
- return err
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
+ s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
+ v.BlockWriteRaw(TestHash, TestBlock)
+ return v.BlockRead(ctx, TestHash, brdiscard)
})
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelPut(c *check.C) {
- s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
- return v.Put(ctx, TestHash, make([]byte, BlockSize))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
+ s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
+ return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
})
}
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelCompare(c *check.C) {
- s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
- v.PutRaw(TestHash, TestBlock)
- return v.Compare(ctx, TestHash, TestBlock2)
+func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
})
-}
-
-func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
- v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
defer v.Teardown()
v.azHandler.race = make(chan chan struct{})
}()
}
-func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
- volume := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
+ volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
defer volume.Teardown()
stats := func() string {
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := volume.Get(context.Background(), loc, make([]byte, 3))
+ err := volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
- err = volume.Put(context.Background(), loc, []byte("foo"))
+ err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
- _, err = volume.Get(context.Background(), loc, make([]byte, 3))
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = volume.Get(context.Background(), loc, make([]byte, 3))
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
-func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.azHandler.PutRaw(v.ContainerName, locator, data)
+func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
+ v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
}
-func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
- v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
+func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
+ v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
}
-func (v *TestableAzureBlobVolume) Teardown() {
+func (v *testableAzureBlobVolume) Teardown() {
v.azStub.Close()
}
-func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
return "get", "create"
}