+func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+
+ azureWriteRaceInterval = time.Second
+ azureWriteRacePollTime = time.Millisecond
+
+ allDone := make(chan struct{})
+ v.azHandler.race = make(chan chan struct{})
+ go func() {
+ err := v.Put(context.Background(), TestHash, TestBlock)
+ if err != nil {
+ t.Error(err)
+ }
+ }()
+ continuePut := make(chan struct{})
+ // Wait for the stub's Put to create the empty blob
+ v.azHandler.race <- continuePut
+ go func() {
+ buf := make([]byte, len(TestBlock))
+ _, err := v.Get(context.Background(), TestHash, buf)
+ if err != nil {
+ t.Error(err)
+ }
+ close(allDone)
+ }()
+ // Wait for the stub's Get to get the empty blob
+ close(v.azHandler.race)
+ // Allow stub's Put to continue, so the real data is ready
+ // when the volume's Get retries
+ <-continuePut
+ // Wait for volume's Get to return the real data
+ <-allDone
+}
+
+func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+
+ azureWriteRaceInterval = 2 * time.Second
+ azureWriteRacePollTime = 5 * time.Millisecond
+
+ v.PutRaw(TestHash, nil)
+
+ buf := new(bytes.Buffer)
+ v.IndexTo("", buf)
+ if buf.Len() != 0 {
+ t.Errorf("Index %+q should be empty", buf.Bytes())
+ }
+
+ v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
+
+ allDone := make(chan struct{})
+ go func() {
+ defer close(allDone)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(context.Background(), TestHash, buf)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if n != 0 {
+ t.Errorf("Got %+q, expected empty buf", buf[:n])
+ }
+ }()
+ select {
+ case <-allDone:
+ case <-time.After(time.Second):
+ t.Error("Get should have stopped waiting for race when block was 2s old")
+ }
+
+ buf.Reset()
+ v.IndexTo("", buf)
+ if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
+ t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
+ }
+}
+
+func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
+ testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+ v.PutRaw(TestHash, TestBlock)
+ _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
+ return err
+ })
+}
+
+func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
+ testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+ return v.Put(ctx, TestHash, make([]byte, BlockSize))
+ })
+}
+
+func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
+ testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+ v.PutRaw(TestHash, TestBlock)
+ return v.Compare(ctx, TestHash, TestBlock2)
+ })
+}
+
+func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+ v.azHandler.race = make(chan chan struct{})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ allDone := make(chan struct{})
+ go func() {
+ defer close(allDone)
+ err := testFunc(ctx, v)
+ if err != context.Canceled {
+ t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
+ }
+ }()
+ releaseHandler := make(chan struct{})
+ select {
+ case <-allDone:
+ t.Error("testFunc finished without waiting for v.azHandler.race")
+ case <-time.After(10 * time.Second):
+ t.Error("timed out waiting to enter handler")
+ case v.azHandler.race <- releaseHandler:
+ }
+
+ cancel()
+
+ select {
+ case <-time.After(10 * time.Second):
+ t.Error("timed out waiting to cancel")
+ case <-allDone:
+ }
+
+ go func() {
+ <-releaseHandler
+ }()
+}
+
+func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
+ stats := func() string {
+ buf, err := json.Marshal(s.volume.InternalStats())
+ c.Check(err, check.IsNil)
+ return string(buf)
+ }
+
+ c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+ c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
+
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.NotNil)
+ c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+ err = s.volume.Put(context.Background(), loc, []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+ c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
+
+ _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+