2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / azure_blob_volume_test.go
index 48d58ee9bfc454e5b2972e6d36867a578c29e6bb..a543dfc245174d26f6d8a66fcfa47b6d54215d0e 100644 (file)
@@ -13,6 +13,7 @@ import (
        "encoding/xml"
        "flag"
        "fmt"
+       "io"
        "io/ioutil"
        "math/rand"
        "net"
@@ -87,7 +88,7 @@ func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
        blob.Mtime = t
 }
 
-func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
        h.Lock()
        defer h.Unlock()
        h.blobs[container+"|"+hash] = &azBlob{
@@ -365,14 +366,14 @@ func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
        return d.Dialer.Dial(network, address)
 }
 
-type TestableAzureBlobVolume struct {
+type testableAzureBlobVolume struct {
        *AzureBlobVolume
        azHandler *azStubHandler
        azStub    *httptest.Server
        t         TB
 }
 
-func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume {
+func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
        azHandler := newAzStubHandler(t.(*check.C))
        azStub := httptest.NewServer(azHandler)
 
@@ -404,16 +405,17 @@ func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvado
                ListBlobsRetryDelay:  arvados.Duration(time.Millisecond),
                azClient:             azClient,
                container:            &azureContainer{ctr: bs.GetContainerReference(container)},
-               cluster:              cluster,
-               volume:               volume,
+               cluster:              params.Cluster,
+               volume:               params.ConfigVolume,
                logger:               ctxlog.TestLogger(t),
-               metrics:              metrics,
+               metrics:              params.MetricsVecs,
+               bufferPool:           params.BufferPool,
        }
        if err = v.check(); err != nil {
                t.Fatal(err)
        }
 
-       return &TestableAzureBlobVolume{
+       return &testableAzureBlobVolume{
                AzureBlobVolume: v,
                azHandler:       azHandler,
                azStub:          azStub,
@@ -421,48 +423,54 @@ func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvado
        }
 }
 
-var _ = check.Suite(&StubbedAzureBlobSuite{})
+var _ = check.Suite(&stubbedAzureBlobSuite{})
 
-type StubbedAzureBlobSuite struct {
+type stubbedAzureBlobSuite struct {
        origHTTPTransport http.RoundTripper
 }
 
-func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
        s.origHTTPTransport = http.DefaultTransport
        http.DefaultTransport = &http.Transport{
                Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
        }
 }
 
-func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
+func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
        http.DefaultTransport = s.origHTTPTransport
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
-       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               return s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+               return s.newTestableAzureBlobVolume(t, params)
        })
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
        // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
-       for _, b := range []int{2 << 22, 2<<22 - 1} {
-               DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-                       v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+       for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
+               c.Logf("=== MaxGetBytes=%d", b)
+               DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+                       v := s.newTestableAzureBlobVolume(t, params)
                        v.MaxGetBytes = b
                        return v
                })
        }
 }
 
-func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
-       DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               return s.newTestableAzureBlobVolume(c, cluster, volume, metrics)
+func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
+               return s.newTestableAzureBlobVolume(c, params)
        })
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
-       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
+       v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+               Cluster:      testCluster(c),
+               ConfigVolume: arvados.Volume{Replication: 3},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       })
        defer v.Teardown()
 
        for _, size := range []int{
@@ -478,16 +486,16 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
                        data[i] = byte((i + 7) & 0xff)
                }
                hash := fmt.Sprintf("%x", md5.Sum(data))
-               err := v.Put(context.Background(), hash, data)
+               err := v.BlockWrite(context.Background(), hash, data)
                if err != nil {
                        c.Error(err)
                }
-               gotData := make([]byte, len(data))
-               gotLen, err := v.Get(context.Background(), hash, gotData)
+               gotData := bytes.NewBuffer(nil)
+               gotLen, err := v.BlockRead(context.Background(), hash, gotData)
                if err != nil {
                        c.Error(err)
                }
-               gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
+               gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
                if gotLen != size {
                        c.Errorf("length mismatch: got %d != %d", gotLen, size)
                }
@@ -497,8 +505,13 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
        }
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
-       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
+       v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+               Cluster:      testCluster(c),
+               ConfigVolume: arvados.Volume{Replication: 3},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       })
        defer v.Teardown()
 
        var wg sync.WaitGroup
@@ -508,42 +521,46 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err := v.Put(context.Background(), TestHash, TestBlock)
+               err := v.BlockWrite(context.Background(), TestHash, TestBlock)
                if err != nil {
                        c.Error(err)
                }
        }()
-       continuePut := make(chan struct{})
-       // Wait for the stub's Put to create the empty blob
-       v.azHandler.race <- continuePut
+       continueBlockWrite := make(chan struct{})
+       // Wait for the stub's BlockWrite to create the empty blob
+       v.azHandler.race <- continueBlockWrite
        wg.Add(1)
        go func() {
                defer wg.Done()
-               buf := make([]byte, len(TestBlock))
-               _, err := v.Get(context.Background(), TestHash, buf)
+               _, err := v.BlockRead(context.Background(), TestHash, io.Discard)
                if err != nil {
                        c.Error(err)
                }
        }()
-       // Wait for the stub's Get to get the empty blob
+       // Wait for the stub's BlockRead to get the empty blob
        close(v.azHandler.race)
-       // Allow stub's Put to continue, so the real data is ready
-       // when the volume's Get retries
-       <-continuePut
-       // Wait for Get() and Put() to finish
+       // Allow stub's BlockWrite to continue, so the real data is ready
+       // when the volume's BlockRead retries
+       <-continueBlockWrite
+       // Wait for BlockRead() and BlockWrite() to finish
        wg.Wait()
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
-       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
+       v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+               Cluster:      testCluster(c),
+               ConfigVolume: arvados.Volume{Replication: 3},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       })
        v.AzureBlobVolume.WriteRaceInterval.Set("2s")
        v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
        defer v.Teardown()
 
-       v.PutRaw(TestHash, nil)
+       v.BlockWriteRaw(TestHash, nil)
 
        buf := new(bytes.Buffer)
-       v.IndexTo("", buf)
+       v.Index(context.Background(), "", buf)
        if buf.Len() != 0 {
                c.Errorf("Index %+q should be empty", buf.Bytes())
        }
@@ -553,52 +570,50 @@ func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
        allDone := make(chan struct{})
        go func() {
                defer close(allDone)
-               buf := make([]byte, BlockSize)
-               n, err := v.Get(context.Background(), TestHash, buf)
+               buf := bytes.NewBuffer(nil)
+               n, err := v.BlockRead(context.Background(), TestHash, buf)
                if err != nil {
                        c.Error(err)
                        return
                }
                if n != 0 {
-                       c.Errorf("Got %+q, expected empty buf", buf[:n])
+                       c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
                }
        }()
        select {
        case <-allDone:
        case <-time.After(time.Second):
-               c.Error("Get should have stopped waiting for race when block was 2s old")
+               c.Error("BlockRead should have stopped waiting for race when block was 2s old")
        }
 
        buf.Reset()
-       v.IndexTo("", buf)
+       v.Index(context.Background(), "", buf)
        if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
                c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
        }
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelGet(c *check.C) {
-       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
-               v.PutRaw(TestHash, TestBlock)
-               _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
+       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
+               v.BlockWriteRaw(TestHash, TestBlock)
+               _, err := v.BlockRead(ctx, TestHash, io.Discard)
                return err
        })
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelPut(c *check.C) {
-       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
-               return v.Put(ctx, TestHash, make([]byte, BlockSize))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
+       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
+               return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
        })
 }
 
-func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelCompare(c *check.C) {
-       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
-               v.PutRaw(TestHash, TestBlock)
-               return v.Compare(ctx, TestHash, TestBlock2)
+func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
+       v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+               Cluster:      testCluster(c),
+               ConfigVolume: arvados.Volume{Replication: 3},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
        })
-}
-
-func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
-       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
        defer v.Teardown()
        v.azHandler.race = make(chan chan struct{})
 
@@ -633,8 +648,13 @@ func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, tes
        }()
 }
 
-func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
-       volume := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
+       volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
+               Cluster:      testCluster(c),
+               ConfigVolume: arvados.Volume{Replication: 3},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       })
        defer volume.Teardown()
 
        stats := func() string {
@@ -647,38 +667,38 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := volume.Get(context.Background(), loc, make([]byte, 3))
+       _, err := volume.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
        c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
 
-       err = volume.Put(context.Background(), loc, []byte("foo"))
+       err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
 
-       _, err = volume.Get(context.Background(), loc, make([]byte, 3))
+       _, err = volume.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.IsNil)
-       _, err = volume.Get(context.Background(), loc, make([]byte, 3))
+       _, err = volume.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
-func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
-       v.azHandler.PutRaw(v.ContainerName, locator, data)
+func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
+       v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
 }
 
-func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
-       v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
+func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
+       v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
 }
 
-func (v *TestableAzureBlobVolume) Teardown() {
+func (v *testableAzureBlobVolume) Teardown() {
        v.azStub.Close()
 }
 
-func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
        return "get", "create"
 }