package main
import (
+ "bytes"
"encoding/base64"
"encoding/xml"
"flag"
type azStubHandler struct {
sync.Mutex
blobs map[string]*azBlob
+ race chan chan struct{}
}
func newAzStubHandler() *azStubHandler {
}
}
+func (h *azStubHandler) unlockAndRace() {
+ if h.race == nil {
+ return
+ }
+ h.Unlock()
+ // Signal caller that race is starting by reading from
+ // h.race. If we get a channel, block until that channel is
+ // ready to receive. If we get nil (or h.race is closed) just
+ // proceed.
+ if c := <-h.race; c != nil {
+ c <- struct{}{}
+ }
+ h.Lock()
+}
+
func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
switch {
case r.Method == "PUT" && r.Form.Get("comp") == "":
// "Put Blob" API
+ if _, ok := h.blobs[container+"|"+hash]; !ok {
+ // Like the real Azure service, we offer a
+ // race window during which other clients can
+ // list/get the new blob before any data is
+ // committed.
+ h.blobs[container+"|"+hash] = &azBlob{
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ Etag: makeEtag(),
+ }
+ h.unlockAndRace()
+ }
h.blobs[container+"|"+hash] = &azBlob{
Data: body,
Mtime: time.Now(),
log.Printf("write %+q: %s", blob.Data, err)
}
}
+ h.unlockAndRace()
case r.Method == "DELETE" && hash != "":
// "Delete Blob" API
if !blobExists {
t *testing.T
}
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume {
azHandler := newAzStubHandler()
azStub := httptest.NewServer(azHandler)
http.DefaultTransport = &http.Transport{
Dial: (&azStubDialer{}).Dial,
}
+ azureWriteRaceInterval = time.Millisecond
+ azureWriteRacePollTime = time.Nanosecond
DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
})
http.DefaultTransport = &http.Transport{
Dial: (&azStubDialer{}).Dial,
}
+ azureWriteRaceInterval = time.Millisecond
+ azureWriteRacePollTime = time.Nanosecond
DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
})
}
}
+func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+
+ azureWriteRaceInterval = time.Second
+ azureWriteRacePollTime = time.Millisecond
+
+ allDone := make(chan struct{})
+ v.azHandler.race = make(chan chan struct{})
+ go func() {
+ err := v.Put(TestHash, TestBlock)
+ if err != nil {
+ t.Error(err)
+ }
+ }()
+ continuePut := make(chan struct{})
+ // Wait for the stub's Put to create the empty blob
+ v.azHandler.race <- continuePut
+ go func() {
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Error(err)
+ } else {
+ bufs.Put(buf)
+ }
+ close(allDone)
+ }()
+ // Wait for the stub's Get to get the empty blob
+ close(v.azHandler.race)
+ // Allow stub's Put to continue, so the real data is ready
+ // when the volume's Get retries
+ <-continuePut
+ // Wait for volume's Get to return the real data
+ <-allDone
+}
+
+func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+
+ azureWriteRaceInterval = 2 * time.Second
+ azureWriteRacePollTime = 5 * time.Millisecond
+
+ v.PutRaw(TestHash, nil)
+
+ buf := new(bytes.Buffer)
+ v.IndexTo("", buf)
+ if buf.Len() != 0 {
+ t.Errorf("Index %+q should be empty", buf.Bytes())
+ }
+
+ v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
+
+ allDone := make(chan struct{})
+ go func() {
+ defer close(allDone)
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if len(buf) != 0 {
+ t.Errorf("Got %+q, expected empty buf", buf)
+ }
+ bufs.Put(buf)
+ }()
+ select {
+ case <-allDone:
+ case <-time.After(time.Second):
+ t.Error("Get should have stopped waiting for race when block was 2s old")
+ }
+
+ buf.Reset()
+ v.IndexTo("", buf)
+ if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
+ t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
+ }
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.containerName, locator, data)
}