//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bytes"
"net"
"net/http"
"net/http/httptest"
+ "os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
- "testing"
"time"
- log "github.com/Sirupsen/logrus"
- "github.com/curoverse/azure-sdk-for-go/storage"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
// used by Microsoft's Azure emulator: the Azure SDK
// recognizes that magic string and changes its behavior to
// cater to the Azure SDK's own test suite.
- fakeAccountName = "fakeAccountName"
+ fakeAccountName = "fakeaccountname"
fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
)
-var azureTestContainer string
+var (
+ azureTestContainer string
+ azureTestDebug = os.Getenv("ARVADOS_DEBUG") != ""
+)
func init() {
flag.StringVar(
type azStubHandler struct {
sync.Mutex
- blobs map[string]*azBlob
- race chan chan struct{}
+ logger logrus.FieldLogger
+ blobs map[string]*azBlob
+ race chan chan struct{}
+ didlist503 bool
}
-func newAzStubHandler() *azStubHandler {
+func newAzStubHandler(c *check.C) *azStubHandler {
return &azStubHandler{
- blobs: make(map[string]*azBlob),
+ blobs: make(map[string]*azBlob),
+ logger: ctxlog.TestLogger(c),
}
}
blob.Mtime = t
}
-func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
h.Lock()
defer h.Unlock()
h.blobs[container+"|"+hash] = &azBlob{
func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
- // defer log.Printf("azStubHandler: %+v", r)
+ if azureTestDebug {
+ defer h.logger.Printf("azStubHandler: %+v", r)
+ }
path := strings.Split(r.URL.Path, "/")
container := path[1]
}
if err := r.ParseForm(); err != nil {
- log.Printf("azStubHandler(%+v): %s", r, err)
+ h.logger.Printf("azStubHandler(%+v): %s", r, err)
rw.WriteHeader(http.StatusBadRequest)
return
}
case r.Method == "PUT" && r.Form.Get("comp") == "block":
// "Put Block" API
if !blobExists {
- log.Printf("Got block for nonexistent blob: %+v", r)
+ h.logger.Printf("Got block for nonexistent blob: %+v", r)
rw.WriteHeader(http.StatusBadRequest)
return
}
blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
if err != nil || len(blockID) == 0 {
- log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+ h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
rw.WriteHeader(http.StatusBadRequest)
return
}
// "Put Block List" API
bl := &blockListRequestBody{}
if err := xml.Unmarshal(body, bl); err != nil {
- log.Printf("xml Unmarshal: %s", err)
+ h.logger.Printf("xml Unmarshal: %s", err)
rw.WriteHeader(http.StatusBadRequest)
return
}
for _, encBlockID := range bl.Uncommitted {
blockID, err := base64.StdEncoding.DecodeString(encBlockID)
if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
- log.Printf("Invalid blockid: %+q", encBlockID)
+ h.logger.Printf("Invalid blockid: %+q", encBlockID)
rw.WriteHeader(http.StatusBadRequest)
return
}
rw.WriteHeader(http.StatusCreated)
case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
// "Set Metadata Headers" API. We don't bother
- // stubbing "Get Metadata Headers": AzureBlobVolume
+ // stubbing "Get Metadata Headers": azureBlobVolume
// sets metadata headers only as a way to bump Etag
// and Last-Modified.
if !blobExists {
- log.Printf("Got metadata for nonexistent blob: %+v", r)
+ h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
rw.WriteHeader(http.StatusBadRequest)
return
}
rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
if r.Method == "GET" {
if _, err := rw.Write(data); err != nil {
- log.Printf("write %+q: %s", data, err)
+ h.logger.Printf("write %+q: %s", data, err)
}
}
h.unlockAndRace()
rw.WriteHeader(http.StatusAccepted)
case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
// "List Blobs" API
+ if !h.didlist503 {
+ h.didlist503 = true
+ rw.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
prefix := container + "|" + r.Form.Get("prefix")
marker := r.Form.Get("marker")
b := storage.Blob{
Name: hash,
Properties: storage.BlobProperties{
- LastModified: blob.Mtime.Format(time.RFC1123),
+ LastModified: storage.TimeRFC1123(blob.Mtime),
ContentLength: int64(len(blob.Data)),
Etag: blob.Etag,
},
}
buf, err := xml.Marshal(resp)
if err != nil {
- log.Print(err)
+ h.logger.Error(err)
rw.WriteHeader(http.StatusInternalServerError)
}
rw.Write(buf)
default:
- log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+ h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
rw.WriteHeader(http.StatusNotImplemented)
}
}
// tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
// in such cases transparently dials "127.0.0.1:46067" instead.
type azStubDialer struct {
+ logger logrus.FieldLogger
net.Dialer
}
func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
if hp := localHostPortRe.FindString(address); hp != "" {
- log.Println("azStubDialer: dial", hp, "instead of", address)
+ if azureTestDebug {
+ d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
+ }
address = hp
}
return d.Dialer.Dial(network, address)
}
-type TestableAzureBlobVolume struct {
- *AzureBlobVolume
+type testableAzureBlobVolume struct {
+ *azureBlobVolume
azHandler *azStubHandler
azStub *httptest.Server
t TB
}
-func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
- azHandler := newAzStubHandler()
+func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
+ azHandler := newAzStubHandler(t.(*check.C))
azStub := httptest.NewServer(azHandler)
var azClient storage.Client
+ var err error
container := azureTestContainer
if container == "" {
// Connect to stub instead of real Azure storage service
stubURLBase := strings.Split(azStub.URL, "://")[1]
- var err error
if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
t.Fatal(err)
}
container = "fakecontainername"
} else {
// Connect to real Azure storage service
- accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
- if err != nil {
- t.Fatal(err)
- }
- azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
- if err != nil {
+ if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
t.Fatal(err)
}
}
+ azClient.Sender = &singleSender{}
bs := azClient.GetBlobService()
- v := &AzureBlobVolume{
- ContainerName: container,
- ReadOnly: readonly,
- AzureReplication: replication,
- azClient: azClient,
- bsClient: &azureBlobClient{client: &bs},
- }
-
- return &TestableAzureBlobVolume{
- AzureBlobVolume: v,
+ v := &azureBlobVolume{
+ ContainerName: container,
+ WriteRaceInterval: arvados.Duration(time.Millisecond),
+ WriteRacePollTime: arvados.Duration(time.Nanosecond),
+ ListBlobsMaxAttempts: 2,
+ ListBlobsRetryDelay: arvados.Duration(time.Millisecond),
+ azClient: azClient,
+ container: &azureContainer{ctr: bs.GetContainerReference(container)},
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: ctxlog.TestLogger(t),
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
+ }
+ if err = v.check(); err != nil {
+ t.Fatal(err)
+ }
+
+ return &testableAzureBlobVolume{
+ azureBlobVolume: v,
azHandler: azHandler,
azStub: azStub,
t: t,
}
}
-var _ = check.Suite(&StubbedAzureBlobSuite{})
+var _ = check.Suite(&stubbedAzureBlobSuite{})
-type StubbedAzureBlobSuite struct {
- volume *TestableAzureBlobVolume
+type stubbedAzureBlobSuite struct {
origHTTPTransport http.RoundTripper
}
-func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
s.origHTTPTransport = http.DefaultTransport
http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
+ Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
}
- azureWriteRaceInterval = time.Millisecond
- azureWriteRacePollTime = time.Nanosecond
-
- s.volume = NewTestableAzureBlobVolume(c, false, 3)
}
-func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
- s.volume.Teardown()
+func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
http.DefaultTransport = s.origHTTPTransport
}
-func TestAzureBlobVolumeWithGeneric(t *testing.T) {
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
- azureWriteRaceInterval = time.Millisecond
- azureWriteRacePollTime = time.Nanosecond
- DoGenericVolumeTests(t, func(t TB) TestableVolume {
- return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ return s.newTestableAzureBlobVolume(t, params)
})
}
-func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
- defer func(b int) {
- azureMaxGetBytes = b
- }(azureMaxGetBytes)
-
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
- azureWriteRaceInterval = time.Millisecond
- azureWriteRacePollTime = time.Nanosecond
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
// Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
- for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
- DoGenericVolumeTests(t, func(t TB) TestableVolume {
- return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+ for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
+ c.Logf("=== MaxGetBytes=%d", b)
+ DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+ v := s.newTestableAzureBlobVolume(t, params)
+ v.MaxGetBytes = b
+ return v
})
}
}
-func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
- azureWriteRaceInterval = time.Millisecond
- azureWriteRacePollTime = time.Nanosecond
- DoGenericVolumeTests(t, func(t TB) TestableVolume {
- return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
+func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
+ return s.newTestableAzureBlobVolume(c, params)
})
}
-func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
-
- v := NewTestableAzureBlobVolume(t, false, 3)
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
defer v.Teardown()
for _, size := range []int{
data[i] = byte((i + 7) & 0xff)
}
hash := fmt.Sprintf("%x", md5.Sum(data))
- err := v.Put(context.Background(), hash, data)
+ err := v.BlockWrite(context.Background(), hash, data)
if err != nil {
- t.Error(err)
+ c.Error(err)
}
- gotData := make([]byte, len(data))
- gotLen, err := v.Get(context.Background(), hash, gotData)
+ gotData := &brbuffer{}
+ err = v.BlockRead(context.Background(), hash, gotData)
if err != nil {
- t.Error(err)
- }
- gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
- if gotLen != size {
- t.Errorf("length mismatch: got %d != %d", gotLen, size)
+ c.Error(err)
}
+ gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
+ c.Check(gotData.Len(), check.Equals, size)
if gotHash != hash {
- t.Errorf("hash mismatch: got %s != %s", gotHash, hash)
+ c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
}
}
}
-func TestAzureBlobVolumeReplication(t *testing.T) {
- for r := 1; r <= 4; r++ {
- v := NewTestableAzureBlobVolume(t, false, r)
- defer v.Teardown()
- if n := v.Replication(); n != r {
- t.Errorf("Got replication %d, expected %d", n, r)
- }
- }
-}
-
-func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
-
- v := NewTestableAzureBlobVolume(t, false, 3)
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
defer v.Teardown()
- azureWriteRaceInterval = time.Second
- azureWriteRacePollTime = time.Millisecond
+ var wg sync.WaitGroup
- allDone := make(chan struct{})
v.azHandler.race = make(chan chan struct{})
+
+ wg.Add(1)
go func() {
- err := v.Put(context.Background(), TestHash, TestBlock)
+ defer wg.Done()
+ err := v.BlockWrite(context.Background(), TestHash, TestBlock)
if err != nil {
- t.Error(err)
+ c.Error(err)
}
}()
- continuePut := make(chan struct{})
- // Wait for the stub's Put to create the empty blob
- v.azHandler.race <- continuePut
+ continueBlockWrite := make(chan struct{})
+ // Wait for the stub's BlockWrite to create the empty blob
+ v.azHandler.race <- continueBlockWrite
+ wg.Add(1)
go func() {
- buf := make([]byte, len(TestBlock))
- _, err := v.Get(context.Background(), TestHash, buf)
+ defer wg.Done()
+ err := v.BlockRead(context.Background(), TestHash, brdiscard)
if err != nil {
- t.Error(err)
+ c.Error(err)
}
- close(allDone)
}()
- // Wait for the stub's Get to get the empty blob
+ // Wait for the stub's BlockRead to get the empty blob
close(v.azHandler.race)
- // Allow stub's Put to continue, so the real data is ready
- // when the volume's Get retries
- <-continuePut
- // Wait for volume's Get to return the real data
- <-allDone
+ // Allow stub's BlockWrite to continue, so the real data is ready
+ // when the volume's BlockRead retries
+ <-continueBlockWrite
+ // Wait for BlockRead() and BlockWrite() to finish
+ wg.Wait()
}
-func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
-
- v := NewTestableAzureBlobVolume(t, false, 3)
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
+ v.azureBlobVolume.WriteRaceInterval.Set("2s")
+ v.azureBlobVolume.WriteRacePollTime.Set("5ms")
defer v.Teardown()
- azureWriteRaceInterval = 2 * time.Second
- azureWriteRacePollTime = 5 * time.Millisecond
-
- v.PutRaw(TestHash, nil)
+ v.BlockWriteRaw(TestHash, nil)
buf := new(bytes.Buffer)
- v.IndexTo("", buf)
+ v.Index(context.Background(), "", buf)
if buf.Len() != 0 {
- t.Errorf("Index %+q should be empty", buf.Bytes())
+ c.Errorf("Index %+q should be empty", buf.Bytes())
}
v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf := make([]byte, BlockSize)
- n, err := v.Get(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
- t.Error(err)
+ c.Error(err)
return
}
- if n != 0 {
- t.Errorf("Got %+q, expected empty buf", buf[:n])
- }
+ c.Check(buf.String(), check.Equals, "")
}()
select {
case <-allDone:
case <-time.After(time.Second):
- t.Error("Get should have stopped waiting for race when block was 2s old")
+ c.Error("BlockRead should have stopped waiting for race when block was 2s old")
}
buf.Reset()
- v.IndexTo("", buf)
+ v.Index(context.Background(), "", buf)
if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
- t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
+ c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
}
}
-func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
- testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
- v.PutRaw(TestHash, TestBlock)
- _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
- return err
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
+ s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
+ v.BlockWriteRaw(TestHash, TestBlock)
+ return v.BlockRead(ctx, TestHash, brdiscard)
})
}
-func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
- testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
- return v.Put(ctx, TestHash, make([]byte, BlockSize))
+func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
+ s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
+ return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
})
}
-func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
- testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
- v.PutRaw(TestHash, TestBlock)
- return v.Compare(ctx, TestHash, TestBlock2)
+func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
+ v := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
})
-}
-
-func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
- defer func(t http.RoundTripper) {
- http.DefaultTransport = t
- }(http.DefaultTransport)
- http.DefaultTransport = &http.Transport{
- Dial: (&azStubDialer{}).Dial,
- }
-
- v := NewTestableAzureBlobVolume(t, false, 3)
defer v.Teardown()
v.azHandler.race = make(chan chan struct{})
defer close(allDone)
err := testFunc(ctx, v)
if err != context.Canceled {
- t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
+ c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
}
}()
releaseHandler := make(chan struct{})
select {
case <-allDone:
- t.Error("testFunc finished without waiting for v.azHandler.race")
+ c.Error("testFunc finished without waiting for v.azHandler.race")
case <-time.After(10 * time.Second):
- t.Error("timed out waiting to enter handler")
+ c.Error("timed out waiting to enter handler")
case v.azHandler.race <- releaseHandler:
}
select {
case <-time.After(10 * time.Second):
- t.Error("timed out waiting to cancel")
+ c.Error("timed out waiting to cancel")
case <-allDone:
}
}()
}
-func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
+func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
+ volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
+ Cluster: testCluster(c),
+ ConfigVolume: arvados.Volume{Replication: 3},
+ MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+ })
+ defer volume.Teardown()
+
stats := func() string {
- buf, err := json.Marshal(s.volume.InternalStats())
+ buf, err := json.Marshal(volume.InternalStats())
c.Check(err, check.IsNil)
return string(buf)
}
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+ err := volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
- err = s.volume.Put(context.Background(), loc, []byte("foo"))
+ err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
- _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
-func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.azHandler.PutRaw(v.ContainerName, locator, data)
+func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
+ v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
}
-func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
- v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
+func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
+ v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
}
-func (v *TestableAzureBlobVolume) Teardown() {
+func (v *testableAzureBlobVolume) Teardown() {
v.azStub.Close()
}
+func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "get", "create"
+}
+
func makeEtag() string {
return fmt.Sprintf("0x%x", rand.Int63())
}