24 log "github.com/Sirupsen/logrus"
25 "github.com/curoverse/azure-sdk-for-go/storage"
29 // This cannot be the fake account name "devstoreaccount1"
30 // used by Microsoft's Azure emulator: the Azure SDK
31 // recognizes that magic string and changes its behavior to
32 // cater to the Azure SDK's own test suite.
33 fakeAccountName = "fakeAccountName"
34 fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
37 var azureTestContainer string
42 "test.azure-storage-container-volume",
44 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
50 Metadata map[string]string
52 Uncommitted map[string][]byte
55 type azStubHandler struct {
57 blobs map[string]*azBlob
58 race chan chan struct{}
61 func newAzStubHandler() *azStubHandler {
62 return &azStubHandler{
63 blobs: make(map[string]*azBlob),
67 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
68 blob, ok := h.blobs[container+"|"+hash]
75 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
78 h.blobs[container+"|"+hash] = &azBlob{
81 Metadata: make(map[string]string),
82 Uncommitted: make(map[string][]byte),
86 func (h *azStubHandler) unlockAndRace() {
91 // Signal caller that race is starting by reading from
92 // h.race. If we get a channel, block until that channel is
93 // ready to receive. If we get nil (or h.race is closed) just
95 if c := <-h.race; c != nil {
101 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
103 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
106 // defer log.Printf("azStubHandler: %+v", r)
108 path := strings.Split(r.URL.Path, "/")
115 if err := r.ParseForm(); err != nil {
116 log.Printf("azStubHandler(%+v): %s", r, err)
117 rw.WriteHeader(http.StatusBadRequest)
121 body, err := ioutil.ReadAll(r.Body)
126 type blockListRequestBody struct {
127 XMLName xml.Name `xml:"BlockList"`
131 blob, blobExists := h.blobs[container+"|"+hash]
134 case r.Method == "PUT" && r.Form.Get("comp") == "":
136 if _, ok := h.blobs[container+"|"+hash]; !ok {
137 // Like the real Azure service, we offer a
138 // race window during which other clients can
139 // list/get the new blob before any data is
141 h.blobs[container+"|"+hash] = &azBlob{
143 Uncommitted: make(map[string][]byte),
144 Metadata: make(map[string]string),
149 metadata := make(map[string]string)
150 for k, v := range r.Header {
151 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
152 name := k[len("x-ms-meta-"):]
153 metadata[strings.ToLower(name)] = v[0]
156 h.blobs[container+"|"+hash] = &azBlob{
159 Uncommitted: make(map[string][]byte),
163 rw.WriteHeader(http.StatusCreated)
164 case r.Method == "PUT" && r.Form.Get("comp") == "block":
167 log.Printf("Got block for nonexistent blob: %+v", r)
168 rw.WriteHeader(http.StatusBadRequest)
171 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
172 if err != nil || len(blockID) == 0 {
173 log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
174 rw.WriteHeader(http.StatusBadRequest)
177 blob.Uncommitted[string(blockID)] = body
178 rw.WriteHeader(http.StatusCreated)
179 case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
180 // "Put Block List" API
181 bl := &blockListRequestBody{}
182 if err := xml.Unmarshal(body, bl); err != nil {
183 log.Printf("xml Unmarshal: %s", err)
184 rw.WriteHeader(http.StatusBadRequest)
187 for _, encBlockID := range bl.Uncommitted {
188 blockID, err := base64.StdEncoding.DecodeString(encBlockID)
189 if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
190 log.Printf("Invalid blockid: %+q", encBlockID)
191 rw.WriteHeader(http.StatusBadRequest)
194 blob.Data = blob.Uncommitted[string(blockID)]
195 blob.Etag = makeEtag()
196 blob.Mtime = time.Now()
197 delete(blob.Uncommitted, string(blockID))
199 rw.WriteHeader(http.StatusCreated)
200 case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
201 // "Set Metadata Headers" API. We don't bother
202 // stubbing "Get Metadata Headers": AzureBlobVolume
203 // sets metadata headers only as a way to bump Etag
204 // and Last-Modified.
206 log.Printf("Got metadata for nonexistent blob: %+v", r)
207 rw.WriteHeader(http.StatusBadRequest)
210 blob.Metadata = make(map[string]string)
211 for k, v := range r.Header {
212 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
213 name := k[len("x-ms-meta-"):]
214 blob.Metadata[strings.ToLower(name)] = v[0]
217 blob.Mtime = time.Now()
218 blob.Etag = makeEtag()
219 case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
220 // "Get Blob Metadata" API
222 rw.WriteHeader(http.StatusNotFound)
225 for k, v := range blob.Metadata {
226 rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
229 case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
232 rw.WriteHeader(http.StatusNotFound)
236 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
237 b0, err0 := strconv.Atoi(rangeSpec[1])
238 b1, err1 := strconv.Atoi(rangeSpec[2])
239 if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
240 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
241 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
244 rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
245 rw.WriteHeader(http.StatusPartialContent)
246 data = data[b0 : b1+1]
248 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
249 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
250 if r.Method == "GET" {
251 if _, err := rw.Write(data); err != nil {
252 log.Printf("write %+q: %s", data, err)
256 case r.Method == "DELETE" && hash != "":
259 rw.WriteHeader(http.StatusNotFound)
262 delete(h.blobs, container+"|"+hash)
263 rw.WriteHeader(http.StatusAccepted)
264 case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
266 prefix := container + "|" + r.Form.Get("prefix")
267 marker := r.Form.Get("marker")
270 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
274 resp := storage.BlobListResponse{
277 MaxResults: int64(maxResults),
279 var hashes sort.StringSlice
280 for k := range h.blobs {
281 if strings.HasPrefix(k, prefix) {
282 hashes = append(hashes, k[len(container)+1:])
286 for _, hash := range hashes {
287 if len(resp.Blobs) == maxResults {
288 resp.NextMarker = hash
291 if len(resp.Blobs) > 0 || marker == "" || marker == hash {
292 blob := h.blobs[container+"|"+hash]
293 bmeta := map[string]string(nil)
294 if r.Form.Get("include") == "metadata" {
295 bmeta = blob.Metadata
299 Properties: storage.BlobProperties{
300 LastModified: blob.Mtime.Format(time.RFC1123),
301 ContentLength: int64(len(blob.Data)),
306 resp.Blobs = append(resp.Blobs, b)
309 buf, err := xml.Marshal(resp)
312 rw.WriteHeader(http.StatusInternalServerError)
316 log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
317 rw.WriteHeader(http.StatusNotImplemented)
321 // azStubDialer is a net.Dialer that notices when the Azure driver
322 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
323 // in such cases transparently dials "127.0.0.1:46067" instead.
324 type azStubDialer struct {
328 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
330 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
331 if hp := localHostPortRe.FindString(address); hp != "" {
332 log.Println("azStubDialer: dial", hp, "instead of", address)
335 return d.Dialer.Dial(network, address)
338 type TestableAzureBlobVolume struct {
340 azHandler *azStubHandler
341 azStub *httptest.Server
345 func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
346 azHandler := newAzStubHandler()
347 azStub := httptest.NewServer(azHandler)
349 var azClient storage.Client
351 container := azureTestContainer
353 // Connect to stub instead of real Azure storage service
354 stubURLBase := strings.Split(azStub.URL, "://")[1]
356 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
359 container = "fakecontainername"
361 // Connect to real Azure storage service
362 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
366 azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
372 v := &AzureBlobVolume{
373 ContainerName: container,
375 AzureReplication: replication,
377 bsClient: azClient.GetBlobService(),
380 return &TestableAzureBlobVolume{
382 azHandler: azHandler,
388 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
389 defer func(t http.RoundTripper) {
390 http.DefaultTransport = t
391 }(http.DefaultTransport)
392 http.DefaultTransport = &http.Transport{
393 Dial: (&azStubDialer{}).Dial,
395 azureWriteRaceInterval = time.Millisecond
396 azureWriteRacePollTime = time.Nanosecond
397 DoGenericVolumeTests(t, func(t TB) TestableVolume {
398 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
402 func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
407 defer func(t http.RoundTripper) {
408 http.DefaultTransport = t
409 }(http.DefaultTransport)
410 http.DefaultTransport = &http.Transport{
411 Dial: (&azStubDialer{}).Dial,
413 azureWriteRaceInterval = time.Millisecond
414 azureWriteRacePollTime = time.Nanosecond
415 // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
416 for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
417 DoGenericVolumeTests(t, func(t TB) TestableVolume {
418 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
423 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
424 defer func(t http.RoundTripper) {
425 http.DefaultTransport = t
426 }(http.DefaultTransport)
427 http.DefaultTransport = &http.Transport{
428 Dial: (&azStubDialer{}).Dial,
430 azureWriteRaceInterval = time.Millisecond
431 azureWriteRacePollTime = time.Nanosecond
432 DoGenericVolumeTests(t, func(t TB) TestableVolume {
433 return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
437 func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
438 defer func(t http.RoundTripper) {
439 http.DefaultTransport = t
440 }(http.DefaultTransport)
441 http.DefaultTransport = &http.Transport{
442 Dial: (&azStubDialer{}).Dial,
445 v := NewTestableAzureBlobVolume(t, false, 3)
448 for _, size := range []int{
449 2<<22 - 1, // one <max read
450 2 << 22, // one =max read
451 2<<22 + 1, // one =max read, one <max
452 2 << 23, // two =max reads
456 data := make([]byte, size)
457 for i := range data {
458 data[i] = byte((i + 7) & 0xff)
460 hash := fmt.Sprintf("%x", md5.Sum(data))
461 err := v.Put(context.Background(), hash, data)
465 gotData := make([]byte, len(data))
466 gotLen, err := v.Get(context.Background(), hash, gotData)
470 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
472 t.Error("length mismatch: got %d != %d", gotLen, size)
475 t.Error("hash mismatch: got %s != %s", gotHash, hash)
480 func TestAzureBlobVolumeReplication(t *testing.T) {
481 for r := 1; r <= 4; r++ {
482 v := NewTestableAzureBlobVolume(t, false, r)
484 if n := v.Replication(); n != r {
485 t.Errorf("Got replication %d, expected %d", n, r)
490 func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
491 defer func(t http.RoundTripper) {
492 http.DefaultTransport = t
493 }(http.DefaultTransport)
494 http.DefaultTransport = &http.Transport{
495 Dial: (&azStubDialer{}).Dial,
498 v := NewTestableAzureBlobVolume(t, false, 3)
501 azureWriteRaceInterval = time.Second
502 azureWriteRacePollTime = time.Millisecond
504 allDone := make(chan struct{})
505 v.azHandler.race = make(chan chan struct{})
507 err := v.Put(context.Background(), TestHash, TestBlock)
512 continuePut := make(chan struct{})
513 // Wait for the stub's Put to create the empty blob
514 v.azHandler.race <- continuePut
516 buf := make([]byte, len(TestBlock))
517 _, err := v.Get(context.Background(), TestHash, buf)
523 // Wait for the stub's Get to get the empty blob
524 close(v.azHandler.race)
525 // Allow stub's Put to continue, so the real data is ready
526 // when the volume's Get retries
528 // Wait for volume's Get to return the real data
532 func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
533 defer func(t http.RoundTripper) {
534 http.DefaultTransport = t
535 }(http.DefaultTransport)
536 http.DefaultTransport = &http.Transport{
537 Dial: (&azStubDialer{}).Dial,
540 v := NewTestableAzureBlobVolume(t, false, 3)
543 azureWriteRaceInterval = 2 * time.Second
544 azureWriteRacePollTime = 5 * time.Millisecond
546 v.PutRaw(TestHash, nil)
548 buf := new(bytes.Buffer)
551 t.Errorf("Index %+q should be empty", buf.Bytes())
554 v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
556 allDone := make(chan struct{})
559 buf := make([]byte, BlockSize)
560 n, err := v.Get(context.Background(), TestHash, buf)
566 t.Errorf("Got %+q, expected empty buf", buf[:n])
571 case <-time.After(time.Second):
572 t.Error("Get should have stopped waiting for race when block was 2s old")
577 if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
578 t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
582 func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
583 testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
584 v.PutRaw(TestHash, TestBlock)
585 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
590 func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
591 testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
592 return v.Put(ctx, TestHash, make([]byte, BlockSize))
596 func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
597 testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
598 v.PutRaw(TestHash, TestBlock)
599 return v.Compare(ctx, TestHash, TestBlock2)
603 func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
604 defer func(t http.RoundTripper) {
605 http.DefaultTransport = t
606 }(http.DefaultTransport)
607 http.DefaultTransport = &http.Transport{
608 Dial: (&azStubDialer{}).Dial,
611 v := NewTestableAzureBlobVolume(t, false, 3)
613 v.azHandler.race = make(chan chan struct{})
615 ctx, cancel := context.WithCancel(context.Background())
616 allDone := make(chan struct{})
619 err := testFunc(ctx, v)
620 if err != context.Canceled {
621 t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
624 releaseHandler := make(chan struct{})
627 t.Error("testFunc finished without waiting for v.azHandler.race")
628 case <-time.After(10 * time.Second):
629 t.Error("timed out waiting to enter handler")
630 case v.azHandler.race <- releaseHandler:
636 case <-time.After(10 * time.Second):
637 t.Error("timed out waiting to cancel")
646 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
647 v.azHandler.PutRaw(v.ContainerName, locator, data)
650 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
651 v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
654 func (v *TestableAzureBlobVolume) Teardown() {
658 func makeEtag() string {
659 return fmt.Sprintf("0x%x", rand.Int63())