7159: Omit non-Keep blobs from index
[arvados.git] / services / keepstore / azure_blob_volume_test.go
index 66b0ea0dc3077009d4005b227ca8f3da4c0a7acd..24790c25d2956cf204fc2bf8cd4de3e9d6e61fa5 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "encoding/base64"
        "encoding/xml"
        "flag"
@@ -49,6 +50,7 @@ type azBlob struct {
 type azStubHandler struct {
        sync.Mutex
        blobs map[string]*azBlob
+       race  chan chan struct{}
 }
 
 func newAzStubHandler() *azStubHandler {
@@ -75,6 +77,21 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
        }
 }
 
+func (h *azStubHandler) unlockAndRace() {
+       if h.race == nil {
+               return
+       }
+       h.Unlock()
+       // Signal caller that race is starting by reading from
+       // h.race. If we get a channel, block until that channel is
+       // ready to receive. If we get nil (or h.race is closed) just
+       // proceed.
+       if c := <-h.race; c != nil {
+               c <- struct{}{}
+       }
+       h.Lock()
+}
+
 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
        h.Lock()
        defer h.Unlock()
@@ -108,6 +125,18 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
        switch {
        case r.Method == "PUT" && r.Form.Get("comp") == "":
                // "Put Blob" API
+               if _, ok := h.blobs[container+"|"+hash]; !ok {
+                       // Like the real Azure service, we offer a
+                       // race window during which other clients can
+                       // list/get the new blob before any data is
+                       // committed.
+                       h.blobs[container+"|"+hash] = &azBlob{
+                               Mtime:       time.Now(),
+                               Uncommitted: make(map[string][]byte),
+                               Etag:        makeEtag(),
+                       }
+                       h.unlockAndRace()
+               }
                h.blobs[container+"|"+hash] = &azBlob{
                        Data:        body,
                        Mtime:       time.Now(),
@@ -182,6 +211,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                                log.Printf("write %+q: %s", blob.Data, err)
                        }
                }
+               h.unlockAndRace()
        case r.Method == "DELETE" && hash != "":
                // "Delete Blob" API
                if !blobExists {
@@ -265,7 +295,7 @@ type TestableAzureBlobVolume struct {
        t         *testing.T
 }
 
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume {
        azHandler := newAzStubHandler()
        azStub := httptest.NewServer(azHandler)
 
@@ -309,6 +339,8 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
        http.DefaultTransport = &http.Transport{
                Dial: (&azStubDialer{}).Dial,
        }
+       azureWriteRaceInterval = time.Millisecond
+       azureWriteRacePollTime = time.Nanosecond
        DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
                return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
        })
@@ -321,6 +353,8 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
        http.DefaultTransport = &http.Transport{
                Dial: (&azStubDialer{}).Dial,
        }
+       azureWriteRaceInterval = time.Millisecond
+       azureWriteRacePollTime = time.Nanosecond
        DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
                return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
        })
@@ -336,6 +370,99 @@ func TestAzureBlobVolumeReplication(t *testing.T) {
        }
 }
 
+func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
+       defer func(t http.RoundTripper) {
+               http.DefaultTransport = t
+       }(http.DefaultTransport)
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+
+       v := NewTestableAzureBlobVolume(t, false, 3)
+       defer v.Teardown()
+
+       azureWriteRaceInterval = time.Second
+       azureWriteRacePollTime = time.Millisecond
+
+       allDone := make(chan struct{})
+       v.azHandler.race = make(chan chan struct{})
+       go func() {
+               err := v.Put(TestHash, TestBlock)
+               if err != nil {
+                       t.Error(err)
+               }
+       }()
+       continuePut := make(chan struct{})
+       // Wait for the stub's Put to create the empty blob
+       v.azHandler.race <- continuePut
+       go func() {
+               buf, err := v.Get(TestHash)
+               if err != nil {
+                       t.Error(err)
+               } else {
+                       bufs.Put(buf)
+               }
+               close(allDone)
+       }()
+       // Wait for the stub's Get to get the empty blob
+       close(v.azHandler.race)
+       // Allow stub's Put to continue, so the real data is ready
+       // when the volume's Get retries
+       <-continuePut
+       // Wait for volume's Get to return the real data
+       <-allDone
+}
+
+func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
+       defer func(t http.RoundTripper) {
+               http.DefaultTransport = t
+       }(http.DefaultTransport)
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+
+       v := NewTestableAzureBlobVolume(t, false, 3)
+       defer v.Teardown()
+
+       azureWriteRaceInterval = 2 * time.Second
+       azureWriteRacePollTime = 5 * time.Millisecond
+
+       v.PutRaw(TestHash, nil)
+
+       buf := new(bytes.Buffer)
+       v.IndexTo("", buf)
+       if buf.Len() != 0 {
+               t.Errorf("Index %+q should be empty", buf.Bytes())
+       }
+
+       v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
+
+       allDone := make(chan struct{})
+       go func() {
+               defer close(allDone)
+               buf, err := v.Get(TestHash)
+               if err != nil {
+                       t.Error(err)
+                       return
+               }
+               if len(buf) != 0 {
+                       t.Errorf("Got %+q, expected empty buf", buf)
+               }
+               bufs.Put(buf)
+       }()
+       select {
+       case <-allDone:
+       case <-time.After(time.Second):
+               t.Error("Get should have stopped waiting for race when block was 2s old")
+       }
+
+       buf.Reset()
+       v.IndexTo("", buf)
+       if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
+               t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
+       }
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.containerName, locator, data)
 }