25 log "github.com/Sirupsen/logrus"
26 "github.com/curoverse/azure-sdk-for-go/storage"
27 check "gopkg.in/check.v1"
31 // This cannot be the fake account name "devstoreaccount1"
32 // used by Microsoft's Azure emulator: the Azure SDK
33 // recognizes that magic string and changes its behavior to
34 // cater to the Azure SDK's own test suite.
35 fakeAccountName = "fakeAccountName"
36 fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
39 var azureTestContainer string
44 "test.azure-storage-container-volume",
46 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
52 Metadata map[string]string
54 Uncommitted map[string][]byte
57 type azStubHandler struct {
59 blobs map[string]*azBlob
60 race chan chan struct{}
63 func newAzStubHandler() *azStubHandler {
64 return &azStubHandler{
65 blobs: make(map[string]*azBlob),
69 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
70 blob, ok := h.blobs[container+"|"+hash]
77 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
80 h.blobs[container+"|"+hash] = &azBlob{
83 Metadata: make(map[string]string),
84 Uncommitted: make(map[string][]byte),
88 func (h *azStubHandler) unlockAndRace() {
93 // Signal caller that race is starting by reading from
94 // h.race. If we get a channel, block until that channel is
95 // ready to receive. If we get nil (or h.race is closed) just
97 if c := <-h.race; c != nil {
103 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
105 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
108 // defer log.Printf("azStubHandler: %+v", r)
110 path := strings.Split(r.URL.Path, "/")
117 if err := r.ParseForm(); err != nil {
118 log.Printf("azStubHandler(%+v): %s", r, err)
119 rw.WriteHeader(http.StatusBadRequest)
123 body, err := ioutil.ReadAll(r.Body)
128 type blockListRequestBody struct {
129 XMLName xml.Name `xml:"BlockList"`
133 blob, blobExists := h.blobs[container+"|"+hash]
136 case r.Method == "PUT" && r.Form.Get("comp") == "":
138 if _, ok := h.blobs[container+"|"+hash]; !ok {
139 // Like the real Azure service, we offer a
140 // race window during which other clients can
141 // list/get the new blob before any data is
143 h.blobs[container+"|"+hash] = &azBlob{
145 Uncommitted: make(map[string][]byte),
146 Metadata: make(map[string]string),
151 metadata := make(map[string]string)
152 for k, v := range r.Header {
153 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
154 name := k[len("x-ms-meta-"):]
155 metadata[strings.ToLower(name)] = v[0]
158 h.blobs[container+"|"+hash] = &azBlob{
161 Uncommitted: make(map[string][]byte),
165 rw.WriteHeader(http.StatusCreated)
166 case r.Method == "PUT" && r.Form.Get("comp") == "block":
169 log.Printf("Got block for nonexistent blob: %+v", r)
170 rw.WriteHeader(http.StatusBadRequest)
173 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
174 if err != nil || len(blockID) == 0 {
175 log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
176 rw.WriteHeader(http.StatusBadRequest)
179 blob.Uncommitted[string(blockID)] = body
180 rw.WriteHeader(http.StatusCreated)
181 case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
182 // "Put Block List" API
183 bl := &blockListRequestBody{}
184 if err := xml.Unmarshal(body, bl); err != nil {
185 log.Printf("xml Unmarshal: %s", err)
186 rw.WriteHeader(http.StatusBadRequest)
189 for _, encBlockID := range bl.Uncommitted {
190 blockID, err := base64.StdEncoding.DecodeString(encBlockID)
191 if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
192 log.Printf("Invalid blockid: %+q", encBlockID)
193 rw.WriteHeader(http.StatusBadRequest)
196 blob.Data = blob.Uncommitted[string(blockID)]
197 blob.Etag = makeEtag()
198 blob.Mtime = time.Now()
199 delete(blob.Uncommitted, string(blockID))
201 rw.WriteHeader(http.StatusCreated)
202 case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
203 // "Set Metadata Headers" API. We don't bother
204 // stubbing "Get Metadata Headers": AzureBlobVolume
205 // sets metadata headers only as a way to bump Etag
206 // and Last-Modified.
208 log.Printf("Got metadata for nonexistent blob: %+v", r)
209 rw.WriteHeader(http.StatusBadRequest)
212 blob.Metadata = make(map[string]string)
213 for k, v := range r.Header {
214 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
215 name := k[len("x-ms-meta-"):]
216 blob.Metadata[strings.ToLower(name)] = v[0]
219 blob.Mtime = time.Now()
220 blob.Etag = makeEtag()
221 case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
222 // "Get Blob Metadata" API
224 rw.WriteHeader(http.StatusNotFound)
227 for k, v := range blob.Metadata {
228 rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
231 case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
234 rw.WriteHeader(http.StatusNotFound)
238 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
239 b0, err0 := strconv.Atoi(rangeSpec[1])
240 b1, err1 := strconv.Atoi(rangeSpec[2])
241 if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
242 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
243 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
246 rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
247 rw.WriteHeader(http.StatusPartialContent)
248 data = data[b0 : b1+1]
250 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
251 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
252 if r.Method == "GET" {
253 if _, err := rw.Write(data); err != nil {
254 log.Printf("write %+q: %s", data, err)
258 case r.Method == "DELETE" && hash != "":
261 rw.WriteHeader(http.StatusNotFound)
264 delete(h.blobs, container+"|"+hash)
265 rw.WriteHeader(http.StatusAccepted)
266 case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
268 prefix := container + "|" + r.Form.Get("prefix")
269 marker := r.Form.Get("marker")
272 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
276 resp := storage.BlobListResponse{
279 MaxResults: int64(maxResults),
281 var hashes sort.StringSlice
282 for k := range h.blobs {
283 if strings.HasPrefix(k, prefix) {
284 hashes = append(hashes, k[len(container)+1:])
288 for _, hash := range hashes {
289 if len(resp.Blobs) == maxResults {
290 resp.NextMarker = hash
293 if len(resp.Blobs) > 0 || marker == "" || marker == hash {
294 blob := h.blobs[container+"|"+hash]
295 bmeta := map[string]string(nil)
296 if r.Form.Get("include") == "metadata" {
297 bmeta = blob.Metadata
301 Properties: storage.BlobProperties{
302 LastModified: blob.Mtime.Format(time.RFC1123),
303 ContentLength: int64(len(blob.Data)),
308 resp.Blobs = append(resp.Blobs, b)
311 buf, err := xml.Marshal(resp)
314 rw.WriteHeader(http.StatusInternalServerError)
318 log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
319 rw.WriteHeader(http.StatusNotImplemented)
323 // azStubDialer is a net.Dialer that notices when the Azure driver
324 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
325 // in such cases transparently dials "127.0.0.1:46067" instead.
326 type azStubDialer struct {
330 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
332 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
333 if hp := localHostPortRe.FindString(address); hp != "" {
334 log.Println("azStubDialer: dial", hp, "instead of", address)
337 return d.Dialer.Dial(network, address)
340 type TestableAzureBlobVolume struct {
342 azHandler *azStubHandler
343 azStub *httptest.Server
347 func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
348 azHandler := newAzStubHandler()
349 azStub := httptest.NewServer(azHandler)
351 var azClient storage.Client
353 container := azureTestContainer
355 // Connect to stub instead of real Azure storage service
356 stubURLBase := strings.Split(azStub.URL, "://")[1]
358 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
361 container = "fakecontainername"
363 // Connect to real Azure storage service
364 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
368 azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
374 bs := azClient.GetBlobService()
375 v := &AzureBlobVolume{
376 ContainerName: container,
378 AzureReplication: replication,
380 bsClient: &azureBlobClient{client: &bs},
383 return &TestableAzureBlobVolume{
385 azHandler: azHandler,
391 var _ = check.Suite(&StubbedAzureBlobSuite{})
393 type StubbedAzureBlobSuite struct {
394 volume *TestableAzureBlobVolume
395 origHTTPTransport http.RoundTripper
398 func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
399 s.origHTTPTransport = http.DefaultTransport
400 http.DefaultTransport = &http.Transport{
401 Dial: (&azStubDialer{}).Dial,
403 azureWriteRaceInterval = time.Millisecond
404 azureWriteRacePollTime = time.Nanosecond
406 s.volume = NewTestableAzureBlobVolume(c, false, 3)
409 func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
411 http.DefaultTransport = s.origHTTPTransport
414 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
415 defer func(t http.RoundTripper) {
416 http.DefaultTransport = t
417 }(http.DefaultTransport)
418 http.DefaultTransport = &http.Transport{
419 Dial: (&azStubDialer{}).Dial,
421 azureWriteRaceInterval = time.Millisecond
422 azureWriteRacePollTime = time.Nanosecond
423 DoGenericVolumeTests(t, func(t TB) TestableVolume {
424 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
428 func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
433 defer func(t http.RoundTripper) {
434 http.DefaultTransport = t
435 }(http.DefaultTransport)
436 http.DefaultTransport = &http.Transport{
437 Dial: (&azStubDialer{}).Dial,
439 azureWriteRaceInterval = time.Millisecond
440 azureWriteRacePollTime = time.Nanosecond
441 // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
442 for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
443 DoGenericVolumeTests(t, func(t TB) TestableVolume {
444 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
449 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
450 defer func(t http.RoundTripper) {
451 http.DefaultTransport = t
452 }(http.DefaultTransport)
453 http.DefaultTransport = &http.Transport{
454 Dial: (&azStubDialer{}).Dial,
456 azureWriteRaceInterval = time.Millisecond
457 azureWriteRacePollTime = time.Nanosecond
458 DoGenericVolumeTests(t, func(t TB) TestableVolume {
459 return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
463 func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
464 defer func(t http.RoundTripper) {
465 http.DefaultTransport = t
466 }(http.DefaultTransport)
467 http.DefaultTransport = &http.Transport{
468 Dial: (&azStubDialer{}).Dial,
471 v := NewTestableAzureBlobVolume(t, false, 3)
474 for _, size := range []int{
475 2<<22 - 1, // one <max read
476 2 << 22, // one =max read
477 2<<22 + 1, // one =max read, one <max
478 2 << 23, // two =max reads
482 data := make([]byte, size)
483 for i := range data {
484 data[i] = byte((i + 7) & 0xff)
486 hash := fmt.Sprintf("%x", md5.Sum(data))
487 err := v.Put(context.Background(), hash, data)
491 gotData := make([]byte, len(data))
492 gotLen, err := v.Get(context.Background(), hash, gotData)
496 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
498 t.Errorf("length mismatch: got %d != %d", gotLen, size)
501 t.Errorf("hash mismatch: got %s != %s", gotHash, hash)
506 func TestAzureBlobVolumeReplication(t *testing.T) {
507 for r := 1; r <= 4; r++ {
508 v := NewTestableAzureBlobVolume(t, false, r)
510 if n := v.Replication(); n != r {
511 t.Errorf("Got replication %d, expected %d", n, r)
516 func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
517 defer func(t http.RoundTripper) {
518 http.DefaultTransport = t
519 }(http.DefaultTransport)
520 http.DefaultTransport = &http.Transport{
521 Dial: (&azStubDialer{}).Dial,
524 v := NewTestableAzureBlobVolume(t, false, 3)
527 azureWriteRaceInterval = time.Second
528 azureWriteRacePollTime = time.Millisecond
530 allDone := make(chan struct{})
531 v.azHandler.race = make(chan chan struct{})
533 err := v.Put(context.Background(), TestHash, TestBlock)
538 continuePut := make(chan struct{})
539 // Wait for the stub's Put to create the empty blob
540 v.azHandler.race <- continuePut
542 buf := make([]byte, len(TestBlock))
543 _, err := v.Get(context.Background(), TestHash, buf)
549 // Wait for the stub's Get to get the empty blob
550 close(v.azHandler.race)
551 // Allow stub's Put to continue, so the real data is ready
552 // when the volume's Get retries
554 // Wait for volume's Get to return the real data
558 func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
559 defer func(t http.RoundTripper) {
560 http.DefaultTransport = t
561 }(http.DefaultTransport)
562 http.DefaultTransport = &http.Transport{
563 Dial: (&azStubDialer{}).Dial,
566 v := NewTestableAzureBlobVolume(t, false, 3)
569 azureWriteRaceInterval = 2 * time.Second
570 azureWriteRacePollTime = 5 * time.Millisecond
572 v.PutRaw(TestHash, nil)
574 buf := new(bytes.Buffer)
577 t.Errorf("Index %+q should be empty", buf.Bytes())
580 v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
582 allDone := make(chan struct{})
585 buf := make([]byte, BlockSize)
586 n, err := v.Get(context.Background(), TestHash, buf)
592 t.Errorf("Got %+q, expected empty buf", buf[:n])
597 case <-time.After(time.Second):
598 t.Error("Get should have stopped waiting for race when block was 2s old")
603 if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
604 t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
608 func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
609 testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
610 v.PutRaw(TestHash, TestBlock)
611 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
616 func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
617 testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
618 return v.Put(ctx, TestHash, make([]byte, BlockSize))
622 func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
623 testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
624 v.PutRaw(TestHash, TestBlock)
625 return v.Compare(ctx, TestHash, TestBlock2)
629 func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
630 defer func(t http.RoundTripper) {
631 http.DefaultTransport = t
632 }(http.DefaultTransport)
633 http.DefaultTransport = &http.Transport{
634 Dial: (&azStubDialer{}).Dial,
637 v := NewTestableAzureBlobVolume(t, false, 3)
639 v.azHandler.race = make(chan chan struct{})
641 ctx, cancel := context.WithCancel(context.Background())
642 allDone := make(chan struct{})
645 err := testFunc(ctx, v)
646 if err != context.Canceled {
647 t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
650 releaseHandler := make(chan struct{})
653 t.Error("testFunc finished without waiting for v.azHandler.race")
654 case <-time.After(10 * time.Second):
655 t.Error("timed out waiting to enter handler")
656 case v.azHandler.race <- releaseHandler:
662 case <-time.After(10 * time.Second):
663 t.Error("timed out waiting to cancel")
672 func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
673 stats := func() string {
674 buf, err := json.Marshal(s.volume.InternalStats())
675 c.Check(err, check.IsNil)
679 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
680 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
682 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
683 _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
684 c.Check(err, check.NotNil)
685 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
686 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
687 c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
688 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
690 err = s.volume.Put(context.Background(), loc, []byte("foo"))
691 c.Check(err, check.IsNil)
692 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
693 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
695 _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
696 c.Check(err, check.IsNil)
697 _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
698 c.Check(err, check.IsNil)
699 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
702 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
703 v.azHandler.PutRaw(v.ContainerName, locator, data)
706 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
707 v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
710 func (v *TestableAzureBlobVolume) Teardown() {
714 func makeEtag() string {
715 return fmt.Sprintf("0x%x", rand.Int63())