23 "github.com/curoverse/azure-sdk-for-go/storage"
27 // The same fake credentials used by Microsoft's Azure emulator
28 emulatorAccountName = "devstoreaccount1"
29 emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
32 var azureTestContainer string
37 "test.azure-storage-container-volume",
39 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
45 Metadata map[string]string
47 Uncommitted map[string][]byte
50 type azStubHandler struct {
52 blobs map[string]*azBlob
53 race chan chan struct{}
56 func newAzStubHandler() *azStubHandler {
57 return &azStubHandler{
58 blobs: make(map[string]*azBlob),
62 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
63 blob, ok := h.blobs[container+"|"+hash]
70 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
73 h.blobs[container+"|"+hash] = &azBlob{
76 Uncommitted: make(map[string][]byte),
80 func (h *azStubHandler) unlockAndRace() {
85 // Signal caller that race is starting by reading from
86 // h.race. If we get a channel, block until that channel is
87 // ready to receive. If we get nil (or h.race is closed) just
89 if c := <-h.race; c != nil {
95 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
98 // defer log.Printf("azStubHandler: %+v", r)
100 path := strings.Split(r.URL.Path, "/")
107 if err := r.ParseForm(); err != nil {
108 log.Printf("azStubHandler(%+v): %s", r, err)
109 rw.WriteHeader(http.StatusBadRequest)
113 body, err := ioutil.ReadAll(r.Body)
118 type blockListRequestBody struct {
119 XMLName xml.Name `xml:"BlockList"`
123 blob, blobExists := h.blobs[container+"|"+hash]
126 case r.Method == "PUT" && r.Form.Get("comp") == "":
128 if _, ok := h.blobs[container+"|"+hash]; !ok {
129 // Like the real Azure service, we offer a
130 // race window during which other clients can
131 // list/get the new blob before any data is
133 h.blobs[container+"|"+hash] = &azBlob{
135 Uncommitted: make(map[string][]byte),
140 h.blobs[container+"|"+hash] = &azBlob{
143 Uncommitted: make(map[string][]byte),
146 rw.WriteHeader(http.StatusCreated)
147 case r.Method == "PUT" && r.Form.Get("comp") == "block":
150 log.Printf("Got block for nonexistent blob: %+v", r)
151 rw.WriteHeader(http.StatusBadRequest)
154 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
155 if err != nil || len(blockID) == 0 {
156 log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
157 rw.WriteHeader(http.StatusBadRequest)
160 blob.Uncommitted[string(blockID)] = body
161 rw.WriteHeader(http.StatusCreated)
162 case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
163 // "Put Block List" API
164 bl := &blockListRequestBody{}
165 if err := xml.Unmarshal(body, bl); err != nil {
166 log.Printf("xml Unmarshal: %s", err)
167 rw.WriteHeader(http.StatusBadRequest)
170 for _, encBlockID := range bl.Uncommitted {
171 blockID, err := base64.StdEncoding.DecodeString(encBlockID)
172 if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
173 log.Printf("Invalid blockid: %+q", encBlockID)
174 rw.WriteHeader(http.StatusBadRequest)
177 blob.Data = blob.Uncommitted[string(blockID)]
178 blob.Etag = makeEtag()
179 blob.Mtime = time.Now()
180 delete(blob.Uncommitted, string(blockID))
182 rw.WriteHeader(http.StatusCreated)
183 case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
184 // "Set Metadata Headers" API. We don't bother
185 // stubbing "Get Metadata Headers": AzureBlobVolume
186 // sets metadata headers only as a way to bump Etag
187 // and Last-Modified.
189 log.Printf("Got metadata for nonexistent blob: %+v", r)
190 rw.WriteHeader(http.StatusBadRequest)
193 blob.Metadata = make(map[string]string)
194 for k, v := range r.Header {
195 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
196 blob.Metadata[k] = v[0]
199 blob.Mtime = time.Now()
200 blob.Etag = makeEtag()
201 case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
204 rw.WriteHeader(http.StatusNotFound)
207 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
208 rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
209 if r.Method == "GET" {
210 if _, err := rw.Write(blob.Data); err != nil {
211 log.Printf("write %+q: %s", blob.Data, err)
215 case r.Method == "DELETE" && hash != "":
218 rw.WriteHeader(http.StatusNotFound)
221 delete(h.blobs, container+"|"+hash)
222 rw.WriteHeader(http.StatusAccepted)
223 case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
225 prefix := container + "|" + r.Form.Get("prefix")
226 marker := r.Form.Get("marker")
229 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
233 resp := storage.BlobListResponse{
236 MaxResults: int64(maxResults),
238 var hashes sort.StringSlice
239 for k := range h.blobs {
240 if strings.HasPrefix(k, prefix) {
241 hashes = append(hashes, k[len(container)+1:])
245 for _, hash := range hashes {
246 if len(resp.Blobs) == maxResults {
247 resp.NextMarker = hash
250 if len(resp.Blobs) > 0 || marker == "" || marker == hash {
251 blob := h.blobs[container+"|"+hash]
252 resp.Blobs = append(resp.Blobs, storage.Blob{
254 Properties: storage.BlobProperties{
255 LastModified: blob.Mtime.Format(time.RFC1123),
256 ContentLength: int64(len(blob.Data)),
262 buf, err := xml.Marshal(resp)
265 rw.WriteHeader(http.StatusInternalServerError)
269 log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
270 rw.WriteHeader(http.StatusNotImplemented)
274 // azStubDialer is a net.Dialer that notices when the Azure driver
275 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
276 // in such cases transparently dials "127.0.0.1:46067" instead.
277 type azStubDialer struct {
281 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
283 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
284 if hp := localHostPortRe.FindString(address); hp != "" {
285 log.Println("azStubDialer: dial", hp, "instead of", address)
288 return d.Dialer.Dial(network, address)
291 type TestableAzureBlobVolume struct {
293 azHandler *azStubHandler
294 azStub *httptest.Server
298 func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
299 azHandler := newAzStubHandler()
300 azStub := httptest.NewServer(azHandler)
302 var azClient storage.Client
304 container := azureTestContainer
306 // Connect to stub instead of real Azure storage service
307 stubURLBase := strings.Split(azStub.URL, "://")[1]
309 if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
312 container = "fakecontainername"
314 // Connect to real Azure storage service
315 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
319 azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
325 v := NewAzureBlobVolume(azClient, container, readonly, replication)
327 return &TestableAzureBlobVolume{
329 azHandler: azHandler,
335 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
336 defer func(t http.RoundTripper) {
337 http.DefaultTransport = t
338 }(http.DefaultTransport)
339 http.DefaultTransport = &http.Transport{
340 Dial: (&azStubDialer{}).Dial,
342 azureWriteRaceInterval = time.Millisecond
343 azureWriteRacePollTime = time.Nanosecond
344 DoGenericVolumeTests(t, func(t TB) TestableVolume {
345 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
349 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
350 defer func(t http.RoundTripper) {
351 http.DefaultTransport = t
352 }(http.DefaultTransport)
353 http.DefaultTransport = &http.Transport{
354 Dial: (&azStubDialer{}).Dial,
356 azureWriteRaceInterval = time.Millisecond
357 azureWriteRacePollTime = time.Nanosecond
358 DoGenericVolumeTests(t, func(t TB) TestableVolume {
359 return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
363 func TestAzureBlobVolumeReplication(t *testing.T) {
364 for r := 1; r <= 4; r++ {
365 v := NewTestableAzureBlobVolume(t, false, r)
367 if n := v.Replication(); n != r {
368 t.Errorf("Got replication %d, expected %d", n, r)
373 func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
374 defer func(t http.RoundTripper) {
375 http.DefaultTransport = t
376 }(http.DefaultTransport)
377 http.DefaultTransport = &http.Transport{
378 Dial: (&azStubDialer{}).Dial,
381 v := NewTestableAzureBlobVolume(t, false, 3)
384 azureWriteRaceInterval = time.Second
385 azureWriteRacePollTime = time.Millisecond
387 allDone := make(chan struct{})
388 v.azHandler.race = make(chan chan struct{})
390 err := v.Put(TestHash, TestBlock)
395 continuePut := make(chan struct{})
396 // Wait for the stub's Put to create the empty blob
397 v.azHandler.race <- continuePut
399 buf, err := v.Get(TestHash)
407 // Wait for the stub's Get to get the empty blob
408 close(v.azHandler.race)
409 // Allow stub's Put to continue, so the real data is ready
410 // when the volume's Get retries
412 // Wait for volume's Get to return the real data
416 func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
417 defer func(t http.RoundTripper) {
418 http.DefaultTransport = t
419 }(http.DefaultTransport)
420 http.DefaultTransport = &http.Transport{
421 Dial: (&azStubDialer{}).Dial,
424 v := NewTestableAzureBlobVolume(t, false, 3)
427 azureWriteRaceInterval = 2 * time.Second
428 azureWriteRacePollTime = 5 * time.Millisecond
430 v.PutRaw(TestHash, nil)
432 buf := new(bytes.Buffer)
435 t.Errorf("Index %+q should be empty", buf.Bytes())
438 v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
440 allDone := make(chan struct{})
443 buf, err := v.Get(TestHash)
449 t.Errorf("Got %+q, expected empty buf", buf)
455 case <-time.After(time.Second):
456 t.Error("Get should have stopped waiting for race when block was 2s old")
461 if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
462 t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
466 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
467 v.azHandler.PutRaw(v.containerName, locator, data)
470 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
471 v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
474 func (v *TestableAzureBlobVolume) Teardown() {
478 func makeEtag() string {
479 return fmt.Sprintf("0x%x", rand.Int63())