import (
"bytes"
+ "context"
+ "crypto/md5"
"encoding/base64"
"encoding/xml"
"flag"
h.blobs[container+"|"+hash] = &azBlob{
Data: data,
Mtime: time.Now(),
+ Metadata: make(map[string]string),
Uncommitted: make(map[string][]byte),
}
}
h.Lock()
}
+var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
+
func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
h.blobs[container+"|"+hash] = &azBlob{
Mtime: time.Now(),
Uncommitted: make(map[string][]byte),
+ Metadata: make(map[string]string),
Etag: makeEtag(),
}
h.unlockAndRace()
}
+ metadata := make(map[string]string)
+ for k, v := range r.Header {
+ if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+ name := k[len("x-ms-meta-"):]
+ metadata[strings.ToLower(name)] = v[0]
+ }
+ }
h.blobs[container+"|"+hash] = &azBlob{
Data: body,
Mtime: time.Now(),
Uncommitted: make(map[string][]byte),
+ Metadata: metadata,
Etag: makeEtag(),
}
rw.WriteHeader(http.StatusCreated)
blob.Metadata = make(map[string]string)
for k, v := range r.Header {
if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
- blob.Metadata[k] = v[0]
+ name := k[len("x-ms-meta-"):]
+ blob.Metadata[strings.ToLower(name)] = v[0]
}
}
blob.Mtime = time.Now()
blob.Etag = makeEtag()
+ case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
+ // "Get Blob Metadata" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ for k, v := range blob.Metadata {
+ rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
+ }
+ return
case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
// "Get Blob" API
if !blobExists {
rw.WriteHeader(http.StatusNotFound)
return
}
+ data := blob.Data
+ if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
+ b0, err0 := strconv.Atoi(rangeSpec[1])
+ b1, err1 := strconv.Atoi(rangeSpec[2])
+ if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
+ rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
+ rw.WriteHeader(http.StatusPartialContent)
+ data = data[b0 : b1+1]
+ }
rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
- rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
if r.Method == "GET" {
- if _, err := rw.Write(blob.Data); err != nil {
- log.Printf("write %+q: %s", blob.Data, err)
+ if _, err := rw.Write(data); err != nil {
+ log.Printf("write %+q: %s", data, err)
}
}
h.unlockAndRace()
}
if len(resp.Blobs) > 0 || marker == "" || marker == hash {
blob := h.blobs[container+"|"+hash]
- resp.Blobs = append(resp.Blobs, storage.Blob{
+ bmeta := map[string]string(nil)
+ if r.Form.Get("include") == "metadata" {
+ bmeta = blob.Metadata
+ }
+ b := storage.Blob{
Name: hash,
Properties: storage.BlobProperties{
LastModified: blob.Mtime.Format(time.RFC1123),
ContentLength: int64(len(blob.Data)),
Etag: blob.Etag,
},
- })
+ Metadata: bmeta,
+ }
+ resp.Blobs = append(resp.Blobs, b)
}
}
buf, err := xml.Marshal(resp)
*AzureBlobVolume
azHandler *azStubHandler
azStub *httptest.Server
- t *testing.T
+ t TB
}
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume {
+func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
azHandler := newAzStubHandler()
azStub := httptest.NewServer(azHandler)
}
}
- v := NewAzureBlobVolume(azClient, container, readonly, replication)
+ v := &AzureBlobVolume{
+ ContainerName: container,
+ ReadOnly: readonly,
+ AzureReplication: replication,
+ azClient: azClient,
+ bsClient: azClient.GetBlobService(),
+ }
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
}
azureWriteRaceInterval = time.Millisecond
azureWriteRacePollTime = time.Nanosecond
- DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ DoGenericVolumeTests(t, func(t TB) TestableVolume {
return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
})
}
+func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
+ defer func(b int) {
+ azureMaxGetBytes = b
+ }(azureMaxGetBytes)
+
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ azureWriteRaceInterval = time.Millisecond
+ azureWriteRacePollTime = time.Nanosecond
+ // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
+ for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
+ DoGenericVolumeTests(t, func(t TB) TestableVolume {
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+ })
+ }
+}
+
func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
defer func(t http.RoundTripper) {
http.DefaultTransport = t
}
azureWriteRaceInterval = time.Millisecond
azureWriteRacePollTime = time.Nanosecond
- DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ DoGenericVolumeTests(t, func(t TB) TestableVolume {
return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
})
}
+func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+
+ for _, size := range []int{
+ 2<<22 - 1, // one <max read
+ 2 << 22, // one =max read
+ 2<<22 + 1, // one =max read, one <max
+ 2 << 23, // two =max reads
+ BlockSize - 1,
+ BlockSize,
+ } {
+ data := make([]byte, size)
+ for i := range data {
+ data[i] = byte((i + 7) & 0xff)
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+ err := v.Put(context.Background(), hash, data)
+ if err != nil {
+ t.Error(err)
+ }
+ gotData := make([]byte, len(data))
+ gotLen, err := v.Get(context.Background(), hash, gotData)
+ if err != nil {
+ t.Error(err)
+ }
+ gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
+ if gotLen != size {
+ t.Error("length mismatch: got %d != %d", gotLen, size)
+ }
+ if gotHash != hash {
+ t.Error("hash mismatch: got %s != %s", gotHash, hash)
+ }
+ }
+}
+
func TestAzureBlobVolumeReplication(t *testing.T) {
for r := 1; r <= 4; r++ {
v := NewTestableAzureBlobVolume(t, false, r)
allDone := make(chan struct{})
v.azHandler.race = make(chan chan struct{})
go func() {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
// Wait for the stub's Put to create the empty blob
v.azHandler.race <- continuePut
go func() {
- buf, err := v.Get(TestHash)
+ buf := make([]byte, len(TestBlock))
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
- } else {
- bufs.Put(buf)
}
close(allDone)
}()
t.Errorf("Index %+q should be empty", buf.Bytes())
}
- v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
+ v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
return
}
- if len(buf) != 0 {
- t.Errorf("Got %+q, expected empty buf", buf)
+ if n != 0 {
+ t.Errorf("Got %+q, expected empty buf", buf[:n])
}
- bufs.Put(buf)
}()
select {
case <-allDone:
}
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.azHandler.PutRaw(v.containerName, locator, data)
+ v.azHandler.PutRaw(v.ContainerName, locator, data)
}
func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
- v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+ v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
}
func (v *TestableAzureBlobVolume) Teardown() {