X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b2bcd45082d2df2b5a17645eb60473cc17c76e88..fbd54468b13466839c24d880a3d041d0a49371af:/services/keepstore/azure_blob_volume_test.go diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 619c013a31..439b402214 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -1,11 +1,15 @@ package main import ( + "bytes" + "crypto/md5" "encoding/base64" "encoding/xml" "flag" + "fmt" "io/ioutil" "log" + "math/rand" "net" "net/http" "net/http/httptest" @@ -17,13 +21,13 @@ import ( "testing" "time" - "github.com/Azure/azure-sdk-for-go/storage" + "github.com/curoverse/azure-sdk-for-go/storage" ) const ( // The same fake credentials used by Microsoft's Azure emulator emulatorAccountName = "devstoreaccount1" - emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" ) var azureTestContainer string @@ -36,15 +40,18 @@ func init() { "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.") } -type azBlob struct{ +type azBlob struct { Data []byte + Etag string + Metadata map[string]string Mtime time.Time Uncommitted map[string][]byte } type azStubHandler struct { sync.Mutex - blobs map[string]*azBlob + blobs map[string]*azBlob + race chan chan struct{} } func newAzStubHandler() *azStubHandler { @@ -54,23 +61,40 @@ func newAzStubHandler() *azStubHandler { } func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) { - if blob, ok := h.blobs[container + "|" + hash]; !ok { + blob, ok := h.blobs[container+"|"+hash] + if !ok { return - } else { - blob.Mtime = t } + blob.Mtime = t } func (h *azStubHandler) PutRaw(container, hash string, data []byte) { h.Lock() defer h.Unlock() - h.blobs[container + "|" + hash] = &azBlob{ - Data: data, - Mtime: time.Now(), + h.blobs[container+"|"+hash] = &azBlob{ + Data: data, + Mtime: time.Now(), Uncommitted: make(map[string][]byte), } } +func (h *azStubHandler) unlockAndRace() { + if h.race == nil { + return + } + h.Unlock() + // Signal caller that race is starting by reading from + // h.race. If we get a channel, block until that channel is + // ready to receive. If we get nil (or h.race is closed) just + // proceed. + if c := <-h.race; c != nil { + c <- struct{}{} + } + h.Lock() +} + +var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`) + func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.Lock() defer h.Unlock() @@ -99,17 +123,32 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { Uncommitted []string } - blob, blobExists := h.blobs[container + "|" + hash] + blob, blobExists := h.blobs[container+"|"+hash] switch { - case r.Method == "PUT" && r.Form.Get("comp") == "" && r.Header.Get("Content-Length") == "0": - rw.WriteHeader(http.StatusCreated) - h.blobs[container + "|" + hash] = &azBlob{ - Data: body, - Mtime: time.Now(), + case r.Method == "PUT" && r.Form.Get("comp") == "": + // "Put Blob" API + if _, ok := h.blobs[container+"|"+hash]; !ok { + // Like the real Azure service, we offer a + // race window during which other clients can + // list/get the new blob before any data is + // committed. + h.blobs[container+"|"+hash] = &azBlob{ + Mtime: time.Now(), + Uncommitted: make(map[string][]byte), + Etag: makeEtag(), + } + h.unlockAndRace() + } + h.blobs[container+"|"+hash] = &azBlob{ + Data: body, + Mtime: time.Now(), Uncommitted: make(map[string][]byte), + Etag: makeEtag(), } + rw.WriteHeader(http.StatusCreated) case r.Method == "PUT" && r.Form.Get("comp") == "block": + // "Put Block" API if !blobExists { log.Printf("Got block for nonexistent blob: %+v", r) rw.WriteHeader(http.StatusBadRequest) @@ -124,6 +163,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { blob.Uncommitted[string(blockID)] = body rw.WriteHeader(http.StatusCreated) case r.Method == "PUT" && r.Form.Get("comp") == "blocklist": + // "Put Block List" API bl := &blockListRequestBody{} if err := xml.Unmarshal(body, bl); err != nil { log.Printf("xml Unmarshal: %s", err) @@ -138,29 +178,66 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { return } blob.Data = blob.Uncommitted[string(blockID)] - log.Printf("body %+q, bl %+v, blockID %+q, data %+q", body, bl, blockID, blob.Data) + blob.Etag = makeEtag() + blob.Mtime = time.Now() + delete(blob.Uncommitted, string(blockID)) } rw.WriteHeader(http.StatusCreated) + case r.Method == "PUT" && r.Form.Get("comp") == "metadata": + // "Set Metadata Headers" API. We don't bother + // stubbing "Get Metadata Headers": AzureBlobVolume + // sets metadata headers only as a way to bump Etag + // and Last-Modified. + if !blobExists { + log.Printf("Got metadata for nonexistent blob: %+v", r) + rw.WriteHeader(http.StatusBadRequest) + return + } + blob.Metadata = make(map[string]string) + for k, v := range r.Header { + if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") { + blob.Metadata[k] = v[0] + } + } + blob.Mtime = time.Now() + blob.Etag = makeEtag() case (r.Method == "GET" || r.Method == "HEAD") && hash != "": + // "Get Blob" API if !blobExists { rw.WriteHeader(http.StatusNotFound) return } + data := blob.Data + if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil { + b0, err0 := strconv.Atoi(rangeSpec[1]) + b1, err1 := strconv.Atoi(rangeSpec[2]) + if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 { + rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data))) + rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return + } + rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data))) + rw.WriteHeader(http.StatusPartialContent) + data = data[b0 : b1+1] + } rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123)) - rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data))) + rw.Header().Set("Content-Length", strconv.Itoa(len(data))) if r.Method == "GET" { - if _, err := rw.Write(blob.Data); err != nil { - log.Printf("write %+q: %s", blob.Data, err) + if _, err := rw.Write(data); err != nil { + log.Printf("write %+q: %s", data, err) } } + h.unlockAndRace() case r.Method == "DELETE" && hash != "": + // "Delete Blob" API if !blobExists { rw.WriteHeader(http.StatusNotFound) return } - delete(h.blobs, container + "|" + hash) + delete(h.blobs, container+"|"+hash) rw.WriteHeader(http.StatusAccepted) case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container": + // "List Blobs" API prefix := container + "|" + r.Form.Get("prefix") marker := r.Form.Get("marker") @@ -170,7 +247,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } resp := storage.BlobListResponse{ - Marker: marker, + Marker: marker, NextMarker: "", MaxResults: int64(maxResults), } @@ -187,12 +264,13 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { break } if len(resp.Blobs) > 0 || marker == "" || marker == hash { - blob := h.blobs[container + "|" + hash] + blob := h.blobs[container+"|"+hash] resp.Blobs = append(resp.Blobs, storage.Blob{ Name: hash, Properties: storage.BlobProperties{ - LastModified: blob.Mtime.Format(time.RFC1123), + LastModified: blob.Mtime.Format(time.RFC1123), ContentLength: int64(len(blob.Data)), + Etag: blob.Etag, }, }) } @@ -217,6 +295,7 @@ type azStubDialer struct { } var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`) + func (d *azStubDialer) Dial(network, address string) (net.Conn, error) { if hp := localHostPortRe.FindString(address); hp != "" { log.Println("azStubDialer: dial", hp, "instead of", address) @@ -229,10 +308,10 @@ type TestableAzureBlobVolume struct { *AzureBlobVolume azHandler *azStubHandler azStub *httptest.Server - t *testing.T + t TB } -func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume { +func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume { azHandler := newAzStubHandler() azStub := httptest.NewServer(azHandler) @@ -259,13 +338,13 @@ func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume { } } - v := NewAzureBlobVolume(azClient, container, readonly) + v := NewAzureBlobVolume(azClient, container, readonly, replication) return &TestableAzureBlobVolume{ AzureBlobVolume: v, - azHandler: azHandler, - azStub: azStub, - t: t, + azHandler: azHandler, + azStub: azStub, + t: t, } } @@ -276,11 +355,34 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) { http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{}).Dial, } - DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { - return NewTestableAzureBlobVolume(t, false) + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableAzureBlobVolume(t, false, azureStorageReplication) }) } +func TestAzureBlobVolumeConcurrentRanges(t *testing.T) { + defer func(b int) { + azureMaxGetBytes = b + }(azureMaxGetBytes) + + defer func(t http.RoundTripper) { + http.DefaultTransport = t + }(http.DefaultTransport) + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases + for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} { + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableAzureBlobVolume(t, false, azureStorageReplication) + }) + } +} + func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { defer func(t http.RoundTripper) { http.DefaultTransport = t @@ -288,11 +390,160 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{}).Dial, } - DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { - return NewTestableAzureBlobVolume(t, true) + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableAzureBlobVolume(t, true, azureStorageReplication) }) } +func TestAzureBlobVolumeRangeFenceposts(t *testing.T) { + defer func(t http.RoundTripper) { + http.DefaultTransport = t + }(http.DefaultTransport) + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + + v := NewTestableAzureBlobVolume(t, false, 3) + defer v.Teardown() + + for _, size := range []int{ + 2<<22 - 1, // one