X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e9f437d9e590cc37ada8534401d254bd5e0a5e85..94788532e822ef26b8c9eac7818f03e3a94fb124:/services/keepstore/azure_blob_volume_test.go diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 66b0ea0dc3..a240c23e16 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/base64" "encoding/xml" "flag" @@ -49,6 +50,7 @@ type azBlob struct { type azStubHandler struct { sync.Mutex blobs map[string]*azBlob + race chan chan struct{} } func newAzStubHandler() *azStubHandler { @@ -58,11 +60,11 @@ func newAzStubHandler() *azStubHandler { } func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) { - if blob, ok := h.blobs[container+"|"+hash]; !ok { + blob, ok := h.blobs[container+"|"+hash] + if !ok { return - } else { - blob.Mtime = t } + blob.Mtime = t } func (h *azStubHandler) PutRaw(container, hash string, data []byte) { @@ -75,6 +77,21 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) { } } +func (h *azStubHandler) unlockAndRace() { + if h.race == nil { + return + } + h.Unlock() + // Signal caller that race is starting by reading from + // h.race. If we get a channel, block until that channel is + // ready to receive. If we get nil (or h.race is closed) just + // proceed. + if c := <-h.race; c != nil { + c <- struct{}{} + } + h.Lock() +} + func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.Lock() defer h.Unlock() @@ -108,6 +125,18 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { switch { case r.Method == "PUT" && r.Form.Get("comp") == "": // "Put Blob" API + if _, ok := h.blobs[container+"|"+hash]; !ok { + // Like the real Azure service, we offer a + // race window during which other clients can + // list/get the new blob before any data is + // committed. + h.blobs[container+"|"+hash] = &azBlob{ + Mtime: time.Now(), + Uncommitted: make(map[string][]byte), + Etag: makeEtag(), + } + h.unlockAndRace() + } h.blobs[container+"|"+hash] = &azBlob{ Data: body, Mtime: time.Now(), @@ -182,6 +211,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { log.Printf("write %+q: %s", blob.Data, err) } } + h.unlockAndRace() case r.Method == "DELETE" && hash != "": // "Delete Blob" API if !blobExists { @@ -265,7 +295,7 @@ type TestableAzureBlobVolume struct { t *testing.T } -func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume { +func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume { azHandler := newAzStubHandler() azStub := httptest.NewServer(azHandler) @@ -309,6 +339,8 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) { http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{}).Dial, } + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { return NewTestableAzureBlobVolume(t, false, azureStorageReplication) }) @@ -321,6 +353,8 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{}).Dial, } + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { return NewTestableAzureBlobVolume(t, true, azureStorageReplication) }) @@ -336,6 +370,99 @@ func TestAzureBlobVolumeReplication(t *testing.T) { } } +func TestAzureBlobVolumeCreateBlobRace(t *testing.T) { + defer func(t http.RoundTripper) { + http.DefaultTransport = t + }(http.DefaultTransport) + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + + v := NewTestableAzureBlobVolume(t, false, 3) + defer v.Teardown() + + azureWriteRaceInterval = time.Second + azureWriteRacePollTime = time.Millisecond + + allDone := make(chan struct{}) + v.azHandler.race = make(chan chan struct{}) + go func() { + err := v.Put(TestHash, TestBlock) + if err != nil { + t.Error(err) + } + }() + continuePut := make(chan struct{}) + // Wait for the stub's Put to create the empty blob + v.azHandler.race <- continuePut + go func() { + buf, err := v.Get(TestHash) + if err != nil { + t.Error(err) + } else { + bufs.Put(buf) + } + close(allDone) + }() + // Wait for the stub's Get to get the empty blob + close(v.azHandler.race) + // Allow stub's Put to continue, so the real data is ready + // when the volume's Get retries + <-continuePut + // Wait for volume's Get to return the real data + <-allDone +} + +func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) { + defer func(t http.RoundTripper) { + http.DefaultTransport = t + }(http.DefaultTransport) + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + + v := NewTestableAzureBlobVolume(t, false, 3) + defer v.Teardown() + + azureWriteRaceInterval = 2 * time.Second + azureWriteRacePollTime = 5 * time.Millisecond + + v.PutRaw(TestHash, nil) + + buf := new(bytes.Buffer) + v.IndexTo("", buf) + if buf.Len() != 0 { + t.Errorf("Index %+q should be empty", buf.Bytes()) + } + + v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond)) + + allDone := make(chan struct{}) + go func() { + defer close(allDone) + buf, err := v.Get(TestHash) + if err != nil { + t.Error(err) + return + } + if len(buf) != 0 { + t.Errorf("Got %+q, expected empty buf", buf) + } + bufs.Put(buf) + }() + select { + case <-allDone: + case <-time.After(time.Second): + t.Error("Get should have stopped waiting for race when block was 2s old") + } + + buf.Reset() + v.IndexTo("", buf) + if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) { + t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0") + } +} + func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) { v.azHandler.PutRaw(v.containerName, locator, data) }