X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f3b8d03f7063b162355bccfd71aeb2b8b67bbdbb..39f6e9f70f683237d9488faac1c549ca19ac9dae:/services/keepstore/azure_blob_volume_test.go diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 48d58ee9bf..c629c9dc15 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -13,6 +13,7 @@ import ( "encoding/xml" "flag" "fmt" + "io" "io/ioutil" "math/rand" "net" @@ -87,7 +88,7 @@ func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) { blob.Mtime = t } -func (h *azStubHandler) PutRaw(container, hash string, data []byte) { +func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) { h.Lock() defer h.Unlock() h.blobs[container+"|"+hash] = &azBlob{ @@ -221,7 +222,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusCreated) case r.Method == "PUT" && r.Form.Get("comp") == "metadata": // "Set Metadata Headers" API. We don't bother - // stubbing "Get Metadata Headers": AzureBlobVolume + // stubbing "Get Metadata Headers": azureBlobVolume // sets metadata headers only as a way to bump Etag // and Last-Modified. if !blobExists { @@ -365,14 +366,14 @@ func (d *azStubDialer) Dial(network, address string) (net.Conn, error) { return d.Dialer.Dial(network, address) } -type TestableAzureBlobVolume struct { - *AzureBlobVolume +type testableAzureBlobVolume struct { + *azureBlobVolume azHandler *azStubHandler azStub *httptest.Server t TB } -func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume { +func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume { azHandler := newAzStubHandler(t.(*check.C)) azStub := httptest.NewServer(azHandler) @@ -396,7 +397,7 @@ func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvado azClient.Sender = &singleSender{} bs := azClient.GetBlobService() - v := &AzureBlobVolume{ + v := &azureBlobVolume{ ContainerName: container, WriteRaceInterval: arvados.Duration(time.Millisecond), WriteRacePollTime: arvados.Duration(time.Nanosecond), @@ -404,65 +405,72 @@ func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvado ListBlobsRetryDelay: arvados.Duration(time.Millisecond), azClient: azClient, container: &azureContainer{ctr: bs.GetContainerReference(container)}, - cluster: cluster, - volume: volume, + cluster: params.Cluster, + volume: params.ConfigVolume, logger: ctxlog.TestLogger(t), - metrics: metrics, + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, } if err = v.check(); err != nil { t.Fatal(err) } - return &TestableAzureBlobVolume{ - AzureBlobVolume: v, + return &testableAzureBlobVolume{ + azureBlobVolume: v, azHandler: azHandler, azStub: azStub, t: t, } } -var _ = check.Suite(&StubbedAzureBlobSuite{}) +var _ = check.Suite(&stubbedAzureBlobSuite{}) -type StubbedAzureBlobSuite struct { +type stubbedAzureBlobSuite struct { origHTTPTransport http.RoundTripper } -func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) { +func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) { s.origHTTPTransport = http.DefaultTransport http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial, } } -func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) { +func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) { http.DefaultTransport = s.origHTTPTransport } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) { - DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableAzureBlobVolume(t, cluster, volume, metrics) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) { + DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { + return s.newTestableAzureBlobVolume(t, params) }) } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) { +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) { // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases - for _, b := range []int{2 << 22, 2<<22 - 1} { - DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics) + for _, b := range []int{2<<22 - 1, 2<<22 - 1} { + c.Logf("=== MaxGetBytes=%d", b) + DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { + v := s.newTestableAzureBlobVolume(t, params) v.MaxGetBytes = b return v }) } } -func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) { - DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableAzureBlobVolume(c, cluster, volume, metrics) +func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) { + DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume { + return s.newTestableAzureBlobVolume(c, params) }) } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { - v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry())) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) defer v.Teardown() for _, size := range []int{ @@ -478,16 +486,16 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { data[i] = byte((i + 7) & 0xff) } hash := fmt.Sprintf("%x", md5.Sum(data)) - err := v.Put(context.Background(), hash, data) + err := v.BlockWrite(context.Background(), hash, data) if err != nil { c.Error(err) } - gotData := make([]byte, len(data)) - gotLen, err := v.Get(context.Background(), hash, gotData) + gotData := bytes.NewBuffer(nil) + gotLen, err := v.BlockRead(context.Background(), hash, gotData) if err != nil { c.Error(err) } - gotHash := fmt.Sprintf("%x", md5.Sum(gotData)) + gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes())) if gotLen != size { c.Errorf("length mismatch: got %d != %d", gotLen, size) } @@ -497,8 +505,13 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { } } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) { - v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry())) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) defer v.Teardown() var wg sync.WaitGroup @@ -508,42 +521,46 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) { wg.Add(1) go func() { defer wg.Done() - err := v.Put(context.Background(), TestHash, TestBlock) + err := v.BlockWrite(context.Background(), TestHash, TestBlock) if err != nil { c.Error(err) } }() - continuePut := make(chan struct{}) - // Wait for the stub's Put to create the empty blob - v.azHandler.race <- continuePut + continueBlockWrite := make(chan struct{}) + // Wait for the stub's BlockWrite to create the empty blob + v.azHandler.race <- continueBlockWrite wg.Add(1) go func() { defer wg.Done() - buf := make([]byte, len(TestBlock)) - _, err := v.Get(context.Background(), TestHash, buf) + _, err := v.BlockRead(context.Background(), TestHash, io.Discard) if err != nil { c.Error(err) } }() - // Wait for the stub's Get to get the empty blob + // Wait for the stub's BlockRead to get the empty blob close(v.azHandler.race) - // Allow stub's Put to continue, so the real data is ready - // when the volume's Get retries - <-continuePut - // Wait for Get() and Put() to finish + // Allow stub's BlockWrite to continue, so the real data is ready + // when the volume's BlockRead retries + <-continueBlockWrite + // Wait for BlockRead() and BlockWrite() to finish wg.Wait() } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) { - v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry())) - v.AzureBlobVolume.WriteRaceInterval.Set("2s") - v.AzureBlobVolume.WriteRacePollTime.Set("5ms") +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) + v.azureBlobVolume.WriteRaceInterval.Set("2s") + v.azureBlobVolume.WriteRacePollTime.Set("5ms") defer v.Teardown() - v.PutRaw(TestHash, nil) + v.BlockWriteRaw(TestHash, nil) buf := new(bytes.Buffer) - v.IndexTo("", buf) + v.Index(context.Background(), "", buf) if buf.Len() != 0 { c.Errorf("Index %+q should be empty", buf.Bytes()) } @@ -553,52 +570,50 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che allDone := make(chan struct{}) go func() { defer close(allDone) - buf := make([]byte, BlockSize) - n, err := v.Get(context.Background(), TestHash, buf) + buf := bytes.NewBuffer(nil) + n, err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { c.Error(err) return } if n != 0 { - c.Errorf("Got %+q, expected empty buf", buf[:n]) + c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n) } }() select { case <-allDone: case <-time.After(time.Second): - c.Error("Get should have stopped waiting for race when block was 2s old") + c.Error("BlockRead should have stopped waiting for race when block was 2s old") } buf.Reset() - v.IndexTo("", buf) + v.Index(context.Background(), "", buf) if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) { c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0") } } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelGet(c *check.C) { - s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error { - v.PutRaw(TestHash, TestBlock) - _, err := v.Get(ctx, TestHash, make([]byte, BlockSize)) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) { + s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error { + v.BlockWriteRaw(TestHash, TestBlock) + _, err := v.BlockRead(ctx, TestHash, io.Discard) return err }) } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelPut(c *check.C) { - s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error { - return v.Put(ctx, TestHash, make([]byte, BlockSize)) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) { + s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error { + return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize)) }) } -func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelCompare(c *check.C) { - s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error { - v.PutRaw(TestHash, TestBlock) - return v.Compare(ctx, TestHash, TestBlock2) +func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), }) -} - -func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *TestableAzureBlobVolume) error) { - v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry())) defer v.Teardown() v.azHandler.race = make(chan chan struct{}) @@ -633,8 +648,13 @@ func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, tes }() } -func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { - volume := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry())) +func (s *stubbedAzureBlobSuite) TestStats(c *check.C) { + volume := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) defer volume.Teardown() stats := func() string { @@ -647,38 +667,38 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"Errors":0,.*`) loc := "acbd18db4cc2f85cedef654fccc4a4d8" - _, err := volume.Get(context.Background(), loc, make([]byte, 3)) + _, err := volume.BlockRead(context.Background(), loc, io.Discard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`) c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) - err = volume.Put(context.Background(), loc, []byte("foo")) + err = volume.BlockWrite(context.Background(), loc, []byte("foo")) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) - _, err = volume.Get(context.Background(), loc, make([]byte, 3)) + _, err = volume.BlockRead(context.Background(), loc, io.Discard) c.Check(err, check.IsNil) - _, err = volume.Get(context.Background(), loc, make([]byte, 3)) + _, err = volume.BlockRead(context.Background(), loc, io.Discard) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) } -func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) { - v.azHandler.PutRaw(v.ContainerName, locator, data) +func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) { + v.azHandler.BlockWriteRaw(v.ContainerName, locator, data) } -func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) { - v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut) +func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) { + v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite) } -func (v *TestableAzureBlobVolume) Teardown() { +func (v *testableAzureBlobVolume) Teardown() { v.azStub.Close() } -func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) { +func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) { return "get", "create" }