X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6599088b45103087b4be743fd51a8330e694e57f..39f6e9f70f683237d9488faac1c549ca19ac9dae:/services/keepstore/azure_blob_volume_test.go diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 4b015a9962..c629c9dc15 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "bytes" @@ -9,21 +13,25 @@ import ( "encoding/xml" "flag" "fmt" + "io" "io/ioutil" "math/rand" "net" "net/http" "net/http/httptest" + "os" "regexp" "sort" "strconv" "strings" "sync" - "testing" "time" - log "github.com/Sirupsen/logrus" - "github.com/curoverse/azure-sdk-for-go/storage" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/Azure/azure-sdk-for-go/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) @@ -32,11 +40,14 @@ const ( // used by Microsoft's Azure emulator: the Azure SDK // recognizes that magic string and changes its behavior to // cater to the Azure SDK's own test suite. - fakeAccountName = "fakeAccountName" + fakeAccountName = "fakeaccountname" fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" ) -var azureTestContainer string +var ( + azureTestContainer string + azureTestDebug = os.Getenv("ARVADOS_DEBUG") != "" +) func init() { flag.StringVar( @@ -56,13 +67,16 @@ type azBlob struct { type azStubHandler struct { sync.Mutex - blobs map[string]*azBlob - race chan chan struct{} + logger logrus.FieldLogger + blobs map[string]*azBlob + race chan chan struct{} + didlist503 bool } -func newAzStubHandler() *azStubHandler { +func newAzStubHandler(c *check.C) *azStubHandler { return &azStubHandler{ - blobs: make(map[string]*azBlob), + blobs: make(map[string]*azBlob), + logger: ctxlog.TestLogger(c), } } @@ -74,7 +88,7 @@ func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) { blob.Mtime = t } -func (h *azStubHandler) PutRaw(container, hash string, data []byte) { +func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) { h.Lock() defer h.Unlock() h.blobs[container+"|"+hash] = &azBlob{ @@ -105,7 +119,9 @@ var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`) func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.Lock() defer h.Unlock() - // defer log.Printf("azStubHandler: %+v", r) + if azureTestDebug { + defer h.logger.Printf("azStubHandler: %+v", r) + } path := strings.Split(r.URL.Path, "/") container := path[1] @@ -115,11 +131,16 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } if err := r.ParseForm(); err != nil { - log.Printf("azStubHandler(%+v): %s", r, err) + h.logger.Printf("azStubHandler(%+v): %s", r, err) rw.WriteHeader(http.StatusBadRequest) return } + if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" { + rw.WriteHeader(http.StatusLengthRequired) + return + } + body, err := ioutil.ReadAll(r.Body) if err != nil { return @@ -166,13 +187,13 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { case r.Method == "PUT" && r.Form.Get("comp") == "block": // "Put Block" API if !blobExists { - log.Printf("Got block for nonexistent blob: %+v", r) + h.logger.Printf("Got block for nonexistent blob: %+v", r) rw.WriteHeader(http.StatusBadRequest) return } blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid")) if err != nil || len(blockID) == 0 { - log.Printf("Invalid blockid: %+q", r.Form.Get("blockid")) + h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid")) rw.WriteHeader(http.StatusBadRequest) return } @@ -182,14 +203,14 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // "Put Block List" API bl := &blockListRequestBody{} if err := xml.Unmarshal(body, bl); err != nil { - log.Printf("xml Unmarshal: %s", err) + h.logger.Printf("xml Unmarshal: %s", err) rw.WriteHeader(http.StatusBadRequest) return } for _, encBlockID := range bl.Uncommitted { blockID, err := base64.StdEncoding.DecodeString(encBlockID) if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil { - log.Printf("Invalid blockid: %+q", encBlockID) + h.logger.Printf("Invalid blockid: %+q", encBlockID) rw.WriteHeader(http.StatusBadRequest) return } @@ -201,11 +222,11 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusCreated) case r.Method == "PUT" && r.Form.Get("comp") == "metadata": // "Set Metadata Headers" API. We don't bother - // stubbing "Get Metadata Headers": AzureBlobVolume + // stubbing "Get Metadata Headers": azureBlobVolume // sets metadata headers only as a way to bump Etag // and Last-Modified. if !blobExists { - log.Printf("Got metadata for nonexistent blob: %+v", r) + h.logger.Printf("Got metadata for nonexistent blob: %+v", r) rw.WriteHeader(http.StatusBadRequest) return } @@ -251,7 +272,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.Header().Set("Content-Length", strconv.Itoa(len(data))) if r.Method == "GET" { if _, err := rw.Write(data); err != nil { - log.Printf("write %+q: %s", data, err) + h.logger.Printf("write %+q: %s", data, err) } } h.unlockAndRace() @@ -265,6 +286,11 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusAccepted) case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container": // "List Blobs" API + if !h.didlist503 { + h.didlist503 = true + rw.WriteHeader(http.StatusServiceUnavailable) + return + } prefix := container + "|" + r.Form.Get("prefix") marker := r.Form.Get("marker") @@ -299,7 +325,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { b := storage.Blob{ Name: hash, Properties: storage.BlobProperties{ - LastModified: blob.Mtime.Format(time.RFC1123), + LastModified: storage.TimeRFC1123(blob.Mtime), ContentLength: int64(len(blob.Data)), Etag: blob.Etag, }, @@ -310,12 +336,12 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } buf, err := xml.Marshal(resp) if err != nil { - log.Print(err) + h.logger.Error(err) rw.WriteHeader(http.StatusInternalServerError) } rw.Write(buf) default: - log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body) + h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body) rw.WriteHeader(http.StatusNotImplemented) } } @@ -324,6 +350,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and // in such cases transparently dials "127.0.0.1:46067" instead. type azStubDialer struct { + logger logrus.FieldLogger net.Dialer } @@ -331,144 +358,119 @@ var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`) func (d *azStubDialer) Dial(network, address string) (net.Conn, error) { if hp := localHostPortRe.FindString(address); hp != "" { - log.Println("azStubDialer: dial", hp, "instead of", address) + if azureTestDebug { + d.logger.Debug("azStubDialer: dial", hp, "instead of", address) + } address = hp } return d.Dialer.Dial(network, address) } -type TestableAzureBlobVolume struct { - *AzureBlobVolume +type testableAzureBlobVolume struct { + *azureBlobVolume azHandler *azStubHandler azStub *httptest.Server t TB } -func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume { - azHandler := newAzStubHandler() +func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume { + azHandler := newAzStubHandler(t.(*check.C)) azStub := httptest.NewServer(azHandler) var azClient storage.Client + var err error container := azureTestContainer if container == "" { // Connect to stub instead of real Azure storage service stubURLBase := strings.Split(azStub.URL, "://")[1] - var err error if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil { t.Fatal(err) } container = "fakecontainername" } else { // Connect to real Azure storage service - accountKey, err := readKeyFromFile(azureStorageAccountKeyFile) - if err != nil { - t.Fatal(err) - } - azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey) - if err != nil { + if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil { t.Fatal(err) } } + azClient.Sender = &singleSender{} bs := azClient.GetBlobService() - v := &AzureBlobVolume{ - ContainerName: container, - ReadOnly: readonly, - AzureReplication: replication, - azClient: azClient, - bsClient: &azureBlobClient{client: &bs}, + v := &azureBlobVolume{ + ContainerName: container, + WriteRaceInterval: arvados.Duration(time.Millisecond), + WriteRacePollTime: arvados.Duration(time.Nanosecond), + ListBlobsMaxAttempts: 2, + ListBlobsRetryDelay: arvados.Duration(time.Millisecond), + azClient: azClient, + container: &azureContainer{ctr: bs.GetContainerReference(container)}, + cluster: params.Cluster, + volume: params.ConfigVolume, + logger: ctxlog.TestLogger(t), + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, + } + if err = v.check(); err != nil { + t.Fatal(err) } - return &TestableAzureBlobVolume{ - AzureBlobVolume: v, + return &testableAzureBlobVolume{ + azureBlobVolume: v, azHandler: azHandler, azStub: azStub, t: t, } } -var _ = check.Suite(&StubbedAzureBlobSuite{}) +var _ = check.Suite(&stubbedAzureBlobSuite{}) -type StubbedAzureBlobSuite struct { - volume *TestableAzureBlobVolume +type stubbedAzureBlobSuite struct { origHTTPTransport http.RoundTripper } -func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) { +func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) { s.origHTTPTransport = http.DefaultTransport http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, + Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial, } - azureWriteRaceInterval = time.Millisecond - azureWriteRacePollTime = time.Nanosecond - - s.volume = NewTestableAzureBlobVolume(c, false, 3) } -func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) { - s.volume.Teardown() +func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) { http.DefaultTransport = s.origHTTPTransport } -func TestAzureBlobVolumeWithGeneric(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - azureWriteRaceInterval = time.Millisecond - azureWriteRacePollTime = time.Nanosecond - DoGenericVolumeTests(t, func(t TB) TestableVolume { - return NewTestableAzureBlobVolume(t, false, azureStorageReplication) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) { + DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { + return s.newTestableAzureBlobVolume(t, params) }) } -func TestAzureBlobVolumeConcurrentRanges(t *testing.T) { - defer func(b int) { - azureMaxGetBytes = b - }(azureMaxGetBytes) - - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - azureWriteRaceInterval = time.Millisecond - azureWriteRacePollTime = time.Nanosecond +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) { // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases - for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} { - DoGenericVolumeTests(t, func(t TB) TestableVolume { - return NewTestableAzureBlobVolume(t, false, azureStorageReplication) + for _, b := range []int{2<<22 - 1, 2<<22 - 1} { + c.Logf("=== MaxGetBytes=%d", b) + DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { + v := s.newTestableAzureBlobVolume(t, params) + v.MaxGetBytes = b + return v }) } } -func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - azureWriteRaceInterval = time.Millisecond - azureWriteRacePollTime = time.Nanosecond - DoGenericVolumeTests(t, func(t TB) TestableVolume { - return NewTestableAzureBlobVolume(t, true, azureStorageReplication) +func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) { + DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume { + return s.newTestableAzureBlobVolume(c, params) }) } -func TestAzureBlobVolumeRangeFenceposts(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - - v := NewTestableAzureBlobVolume(t, false, 3) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) defer v.Teardown() for _, size := range []int{ @@ -484,97 +486,83 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) { data[i] = byte((i + 7) & 0xff) } hash := fmt.Sprintf("%x", md5.Sum(data)) - err := v.Put(context.Background(), hash, data) + err := v.BlockWrite(context.Background(), hash, data) if err != nil { - t.Error(err) + c.Error(err) } - gotData := make([]byte, len(data)) - gotLen, err := v.Get(context.Background(), hash, gotData) + gotData := bytes.NewBuffer(nil) + gotLen, err := v.BlockRead(context.Background(), hash, gotData) if err != nil { - t.Error(err) + c.Error(err) } - gotHash := fmt.Sprintf("%x", md5.Sum(gotData)) + gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes())) if gotLen != size { - t.Errorf("length mismatch: got %d != %d", gotLen, size) + c.Errorf("length mismatch: got %d != %d", gotLen, size) } if gotHash != hash { - t.Errorf("hash mismatch: got %s != %s", gotHash, hash) + c.Errorf("hash mismatch: got %s != %s", gotHash, hash) } } } -func TestAzureBlobVolumeReplication(t *testing.T) { - for r := 1; r <= 4; r++ { - v := NewTestableAzureBlobVolume(t, false, r) - defer v.Teardown() - if n := v.Replication(); n != r { - t.Errorf("Got replication %d, expected %d", n, r) - } - } -} - -func TestAzureBlobVolumeCreateBlobRace(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - - v := NewTestableAzureBlobVolume(t, false, 3) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) defer v.Teardown() - azureWriteRaceInterval = time.Second - azureWriteRacePollTime = time.Millisecond + var wg sync.WaitGroup - allDone := make(chan struct{}) v.azHandler.race = make(chan chan struct{}) + + wg.Add(1) go func() { - err := v.Put(context.Background(), TestHash, TestBlock) + defer wg.Done() + err := v.BlockWrite(context.Background(), TestHash, TestBlock) if err != nil { - t.Error(err) + c.Error(err) } }() - continuePut := make(chan struct{}) - // Wait for the stub's Put to create the empty blob - v.azHandler.race <- continuePut + continueBlockWrite := make(chan struct{}) + // Wait for the stub's BlockWrite to create the empty blob + v.azHandler.race <- continueBlockWrite + wg.Add(1) go func() { - buf := make([]byte, len(TestBlock)) - _, err := v.Get(context.Background(), TestHash, buf) + defer wg.Done() + _, err := v.BlockRead(context.Background(), TestHash, io.Discard) if err != nil { - t.Error(err) + c.Error(err) } - close(allDone) }() - // Wait for the stub's Get to get the empty blob + // Wait for the stub's BlockRead to get the empty blob close(v.azHandler.race) - // Allow stub's Put to continue, so the real data is ready - // when the volume's Get retries - <-continuePut - // Wait for volume's Get to return the real data - <-allDone + // Allow stub's BlockWrite to continue, so the real data is ready + // when the volume's BlockRead retries + <-continueBlockWrite + // Wait for BlockRead() and BlockWrite() to finish + wg.Wait() } -func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - - v := NewTestableAzureBlobVolume(t, false, 3) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) + v.azureBlobVolume.WriteRaceInterval.Set("2s") + v.azureBlobVolume.WriteRacePollTime.Set("5ms") defer v.Teardown() - azureWriteRaceInterval = 2 * time.Second - azureWriteRacePollTime = 5 * time.Millisecond - - v.PutRaw(TestHash, nil) + v.BlockWriteRaw(TestHash, nil) buf := new(bytes.Buffer) - v.IndexTo("", buf) + v.Index(context.Background(), "", buf) if buf.Len() != 0 { - t.Errorf("Index %+q should be empty", buf.Bytes()) + c.Errorf("Index %+q should be empty", buf.Bytes()) } v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond)) @@ -582,59 +570,50 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) { allDone := make(chan struct{}) go func() { defer close(allDone) - buf := make([]byte, BlockSize) - n, err := v.Get(context.Background(), TestHash, buf) + buf := bytes.NewBuffer(nil) + n, err := v.BlockRead(context.Background(), TestHash, buf) if err != nil { - t.Error(err) + c.Error(err) return } if n != 0 { - t.Errorf("Got %+q, expected empty buf", buf[:n]) + c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n) } }() select { case <-allDone: case <-time.After(time.Second): - t.Error("Get should have stopped waiting for race when block was 2s old") + c.Error("BlockRead should have stopped waiting for race when block was 2s old") } buf.Reset() - v.IndexTo("", buf) + v.Index(context.Background(), "", buf) if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) { - t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0") + c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0") } } -func TestAzureBlobVolumeContextCancelGet(t *testing.T) { - testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error { - v.PutRaw(TestHash, TestBlock) - _, err := v.Get(ctx, TestHash, make([]byte, BlockSize)) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) { + s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error { + v.BlockWriteRaw(TestHash, TestBlock) + _, err := v.BlockRead(ctx, TestHash, io.Discard) return err }) } -func TestAzureBlobVolumeContextCancelPut(t *testing.T) { - testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error { - return v.Put(ctx, TestHash, make([]byte, BlockSize)) +func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) { + s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error { + return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize)) }) } -func TestAzureBlobVolumeContextCancelCompare(t *testing.T) { - testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error { - v.PutRaw(TestHash, TestBlock) - return v.Compare(ctx, TestHash, TestBlock2) +func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) { + v := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), }) -} - -func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, - } - - v := NewTestableAzureBlobVolume(t, false, 3) defer v.Teardown() v.azHandler.race = make(chan chan struct{}) @@ -644,15 +623,15 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex defer close(allDone) err := testFunc(ctx, v) if err != context.Canceled { - t.Errorf("got %T %q, expected %q", err, err, context.Canceled) + c.Errorf("got %T %q, expected %q", err, err, context.Canceled) } }() releaseHandler := make(chan struct{}) select { case <-allDone: - t.Error("testFunc finished without waiting for v.azHandler.race") + c.Error("testFunc finished without waiting for v.azHandler.race") case <-time.After(10 * time.Second): - t.Error("timed out waiting to enter handler") + c.Error("timed out waiting to enter handler") case v.azHandler.race <- releaseHandler: } @@ -660,7 +639,7 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex select { case <-time.After(10 * time.Second): - t.Error("timed out waiting to cancel") + c.Error("timed out waiting to cancel") case <-allDone: } @@ -669,9 +648,17 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex }() } -func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { +func (s *stubbedAzureBlobSuite) TestStats(c *check.C) { + volume := s.newTestableAzureBlobVolume(c, newVolumeParams{ + Cluster: testCluster(c), + ConfigVolume: arvados.Volume{Replication: 3}, + MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), + BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), + }) + defer volume.Teardown() + stats := func() string { - buf, err := json.Marshal(s.volume.InternalStats()) + buf, err := json.Marshal(volume.InternalStats()) c.Check(err, check.IsNil) return string(buf) } @@ -680,37 +667,41 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"Errors":0,.*`) loc := "acbd18db4cc2f85cedef654fccc4a4d8" - _, err := s.volume.Get(context.Background(), loc, make([]byte, 3)) + _, err := volume.BlockRead(context.Background(), loc, io.Discard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`) c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) - err = s.volume.Put(context.Background(), loc, []byte("foo")) + err = volume.BlockWrite(context.Background(), loc, []byte("foo")) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) - _, err = s.volume.Get(context.Background(), loc, make([]byte, 3)) + _, err = volume.BlockRead(context.Background(), loc, io.Discard) c.Check(err, check.IsNil) - _, err = s.volume.Get(context.Background(), loc, make([]byte, 3)) + _, err = volume.BlockRead(context.Background(), loc, io.Discard) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) } -func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) { - v.azHandler.PutRaw(v.ContainerName, locator, data) +func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) { + v.azHandler.BlockWriteRaw(v.ContainerName, locator, data) } -func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) { - v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut) +func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) { + v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite) } -func (v *TestableAzureBlobVolume) Teardown() { +func (v *testableAzureBlobVolume) Teardown() { v.azStub.Close() } +func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) { + return "get", "create" +} + func makeEtag() string { return fmt.Sprintf("0x%x", rand.Int63()) }