X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/634d2e25d36b20961d660df0e69628e60099d893..3c4bcfbe8611878bff64778180f84ad93829e6b2:/services/keepstore/azure_blob_volume_test.go diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 74c94ec79a..5d556b3e8c 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "crypto/md5" "encoding/base64" "encoding/xml" "flag" @@ -59,11 +61,11 @@ func newAzStubHandler() *azStubHandler { } func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) { - if blob, ok := h.blobs[container+"|"+hash]; !ok { + blob, ok := h.blobs[container+"|"+hash] + if !ok { return - } else { - blob.Mtime = t } + blob.Mtime = t } func (h *azStubHandler) PutRaw(container, hash string, data []byte) { @@ -72,6 +74,7 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) { h.blobs[container+"|"+hash] = &azBlob{ Data: data, Mtime: time.Now(), + Metadata: make(map[string]string), Uncommitted: make(map[string][]byte), } } @@ -91,6 +94,8 @@ func (h *azStubHandler) unlockAndRace() { h.Lock() } +var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`) + func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.Lock() defer h.Unlock() @@ -132,14 +137,23 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.blobs[container+"|"+hash] = &azBlob{ Mtime: time.Now(), Uncommitted: make(map[string][]byte), + Metadata: make(map[string]string), Etag: makeEtag(), } h.unlockAndRace() } + metadata := make(map[string]string) + for k, v := range r.Header { + if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") { + name := k[len("x-ms-meta-"):] + metadata[strings.ToLower(name)] = v[0] + } + } h.blobs[container+"|"+hash] = &azBlob{ Data: body, Mtime: time.Now(), Uncommitted: make(map[string][]byte), + Metadata: metadata, Etag: makeEtag(), } rw.WriteHeader(http.StatusCreated) @@ -192,22 +206,46 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { blob.Metadata = make(map[string]string) for k, v := range r.Header { if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") { - blob.Metadata[k] = v[0] + name := k[len("x-ms-meta-"):] + blob.Metadata[strings.ToLower(name)] = v[0] } } blob.Mtime = time.Now() blob.Etag = makeEtag() + case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "": + // "Get Blob Metadata" API + if !blobExists { + rw.WriteHeader(http.StatusNotFound) + return + } + for k, v := range blob.Metadata { + rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v) + } + return case (r.Method == "GET" || r.Method == "HEAD") && hash != "": // "Get Blob" API if !blobExists { rw.WriteHeader(http.StatusNotFound) return } + data := blob.Data + if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil { + b0, err0 := strconv.Atoi(rangeSpec[1]) + b1, err1 := strconv.Atoi(rangeSpec[2]) + if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 { + rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data))) + rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return + } + rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data))) + rw.WriteHeader(http.StatusPartialContent) + data = data[b0 : b1+1] + } rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123)) - rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data))) + rw.Header().Set("Content-Length", strconv.Itoa(len(data))) if r.Method == "GET" { - if _, err := rw.Write(blob.Data); err != nil { - log.Printf("write %+q: %s", blob.Data, err) + if _, err := rw.Write(data); err != nil { + log.Printf("write %+q: %s", data, err) } } h.unlockAndRace() @@ -248,14 +286,20 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } if len(resp.Blobs) > 0 || marker == "" || marker == hash { blob := h.blobs[container+"|"+hash] - resp.Blobs = append(resp.Blobs, storage.Blob{ + bmeta := map[string]string(nil) + if r.Form.Get("include") == "metadata" { + bmeta = blob.Metadata + } + b := storage.Blob{ Name: hash, Properties: storage.BlobProperties{ LastModified: blob.Mtime.Format(time.RFC1123), ContentLength: int64(len(blob.Data)), Etag: blob.Etag, }, - }) + Metadata: bmeta, + } + resp.Blobs = append(resp.Blobs, b) } } buf, err := xml.Marshal(resp) @@ -291,10 +335,10 @@ type TestableAzureBlobVolume struct { *AzureBlobVolume azHandler *azStubHandler azStub *httptest.Server - t *testing.T + t TB } -func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume { +func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume { azHandler := newAzStubHandler() azStub := httptest.NewServer(azHandler) @@ -338,11 +382,34 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) { http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{}).Dial, } - DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + DoGenericVolumeTests(t, func(t TB) TestableVolume { return NewTestableAzureBlobVolume(t, false, azureStorageReplication) }) } +func TestAzureBlobVolumeConcurrentRanges(t *testing.T) { + defer func(b int) { + azureMaxGetBytes = b + }(azureMaxGetBytes) + + defer func(t http.RoundTripper) { + http.DefaultTransport = t + }(http.DefaultTransport) + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases + for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} { + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableAzureBlobVolume(t, false, azureStorageReplication) + }) + } +} + func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { defer func(t http.RoundTripper) { http.DefaultTransport = t @@ -350,11 +417,56 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { http.DefaultTransport = &http.Transport{ Dial: (&azStubDialer{}).Dial, } - DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + DoGenericVolumeTests(t, func(t TB) TestableVolume { return NewTestableAzureBlobVolume(t, true, azureStorageReplication) }) } +func TestAzureBlobVolumeRangeFenceposts(t *testing.T) { + defer func(t http.RoundTripper) { + http.DefaultTransport = t + }(http.DefaultTransport) + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + + v := NewTestableAzureBlobVolume(t, false, 3) + defer v.Teardown() + + for _, size := range []int{ + 2<<22 - 1, // one