13063: Update azure storage client library.
[arvados.git] / services / keepstore / azure_blob_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/base64"
12         "encoding/json"
13         "encoding/xml"
14         "flag"
15         "fmt"
16         "io/ioutil"
17         "math/rand"
18         "net"
19         "net/http"
20         "net/http/httptest"
21         "regexp"
22         "sort"
23         "strconv"
24         "strings"
25         "sync"
26         "testing"
27         "time"
28
29         "github.com/curoverse/azure-sdk-for-go/storage"
30         check "gopkg.in/check.v1"
31 )
32
33 const (
34         // This cannot be the fake account name "devstoreaccount1"
35         // used by Microsoft's Azure emulator: the Azure SDK
36         // recognizes that magic string and changes its behavior to
37         // cater to the Azure SDK's own test suite.
38         fakeAccountName = "fakeaccountname"
39         fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
40 )
41
42 var azureTestContainer string
43
44 func init() {
45         flag.StringVar(
46                 &azureTestContainer,
47                 "test.azure-storage-container-volume",
48                 "",
49                 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
50 }
51
52 type azBlob struct {
53         Data        []byte
54         Etag        string
55         Metadata    map[string]string
56         Mtime       time.Time
57         Uncommitted map[string][]byte
58 }
59
60 type azStubHandler struct {
61         sync.Mutex
62         blobs map[string]*azBlob
63         race  chan chan struct{}
64 }
65
66 func newAzStubHandler() *azStubHandler {
67         return &azStubHandler{
68                 blobs: make(map[string]*azBlob),
69         }
70 }
71
72 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
73         blob, ok := h.blobs[container+"|"+hash]
74         if !ok {
75                 return
76         }
77         blob.Mtime = t
78 }
79
80 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
81         h.Lock()
82         defer h.Unlock()
83         h.blobs[container+"|"+hash] = &azBlob{
84                 Data:        data,
85                 Mtime:       time.Now(),
86                 Metadata:    make(map[string]string),
87                 Uncommitted: make(map[string][]byte),
88         }
89 }
90
91 func (h *azStubHandler) unlockAndRace() {
92         if h.race == nil {
93                 return
94         }
95         h.Unlock()
96         // Signal caller that race is starting by reading from
97         // h.race. If we get a channel, block until that channel is
98         // ready to receive. If we get nil (or h.race is closed) just
99         // proceed.
100         if c := <-h.race; c != nil {
101                 c <- struct{}{}
102         }
103         h.Lock()
104 }
105
106 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
107
108 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
109         h.Lock()
110         defer h.Unlock()
111         // defer log.Printf("azStubHandler: %+v", r)
112
113         path := strings.Split(r.URL.Path, "/")
114         container := path[1]
115         hash := ""
116         if len(path) > 2 {
117                 hash = path[2]
118         }
119
120         if err := r.ParseForm(); err != nil {
121                 log.Printf("azStubHandler(%+v): %s", r, err)
122                 rw.WriteHeader(http.StatusBadRequest)
123                 return
124         }
125
126         if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
127                 rw.WriteHeader(http.StatusLengthRequired)
128                 return
129         }
130
131         body, err := ioutil.ReadAll(r.Body)
132         if err != nil {
133                 return
134         }
135
136         type blockListRequestBody struct {
137                 XMLName     xml.Name `xml:"BlockList"`
138                 Uncommitted []string
139         }
140
141         blob, blobExists := h.blobs[container+"|"+hash]
142
143         switch {
144         case r.Method == "PUT" && r.Form.Get("comp") == "":
145                 // "Put Blob" API
146                 if _, ok := h.blobs[container+"|"+hash]; !ok {
147                         // Like the real Azure service, we offer a
148                         // race window during which other clients can
149                         // list/get the new blob before any data is
150                         // committed.
151                         h.blobs[container+"|"+hash] = &azBlob{
152                                 Mtime:       time.Now(),
153                                 Uncommitted: make(map[string][]byte),
154                                 Metadata:    make(map[string]string),
155                                 Etag:        makeEtag(),
156                         }
157                         h.unlockAndRace()
158                 }
159                 metadata := make(map[string]string)
160                 for k, v := range r.Header {
161                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
162                                 name := k[len("x-ms-meta-"):]
163                                 metadata[strings.ToLower(name)] = v[0]
164                         }
165                 }
166                 h.blobs[container+"|"+hash] = &azBlob{
167                         Data:        body,
168                         Mtime:       time.Now(),
169                         Uncommitted: make(map[string][]byte),
170                         Metadata:    metadata,
171                         Etag:        makeEtag(),
172                 }
173                 rw.WriteHeader(http.StatusCreated)
174         case r.Method == "PUT" && r.Form.Get("comp") == "block":
175                 // "Put Block" API
176                 if !blobExists {
177                         log.Printf("Got block for nonexistent blob: %+v", r)
178                         rw.WriteHeader(http.StatusBadRequest)
179                         return
180                 }
181                 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
182                 if err != nil || len(blockID) == 0 {
183                         log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
184                         rw.WriteHeader(http.StatusBadRequest)
185                         return
186                 }
187                 blob.Uncommitted[string(blockID)] = body
188                 rw.WriteHeader(http.StatusCreated)
189         case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
190                 // "Put Block List" API
191                 bl := &blockListRequestBody{}
192                 if err := xml.Unmarshal(body, bl); err != nil {
193                         log.Printf("xml Unmarshal: %s", err)
194                         rw.WriteHeader(http.StatusBadRequest)
195                         return
196                 }
197                 for _, encBlockID := range bl.Uncommitted {
198                         blockID, err := base64.StdEncoding.DecodeString(encBlockID)
199                         if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
200                                 log.Printf("Invalid blockid: %+q", encBlockID)
201                                 rw.WriteHeader(http.StatusBadRequest)
202                                 return
203                         }
204                         blob.Data = blob.Uncommitted[string(blockID)]
205                         blob.Etag = makeEtag()
206                         blob.Mtime = time.Now()
207                         delete(blob.Uncommitted, string(blockID))
208                 }
209                 rw.WriteHeader(http.StatusCreated)
210         case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
211                 // "Set Metadata Headers" API. We don't bother
212                 // stubbing "Get Metadata Headers": AzureBlobVolume
213                 // sets metadata headers only as a way to bump Etag
214                 // and Last-Modified.
215                 if !blobExists {
216                         log.Printf("Got metadata for nonexistent blob: %+v", r)
217                         rw.WriteHeader(http.StatusBadRequest)
218                         return
219                 }
220                 blob.Metadata = make(map[string]string)
221                 for k, v := range r.Header {
222                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
223                                 name := k[len("x-ms-meta-"):]
224                                 blob.Metadata[strings.ToLower(name)] = v[0]
225                         }
226                 }
227                 blob.Mtime = time.Now()
228                 blob.Etag = makeEtag()
229         case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
230                 // "Get Blob Metadata" API
231                 if !blobExists {
232                         rw.WriteHeader(http.StatusNotFound)
233                         return
234                 }
235                 for k, v := range blob.Metadata {
236                         rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
237                 }
238                 return
239         case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
240                 // "Get Blob" API
241                 if !blobExists {
242                         rw.WriteHeader(http.StatusNotFound)
243                         return
244                 }
245                 data := blob.Data
246                 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
247                         b0, err0 := strconv.Atoi(rangeSpec[1])
248                         b1, err1 := strconv.Atoi(rangeSpec[2])
249                         if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
250                                 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
251                                 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
252                                 return
253                         }
254                         rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
255                         rw.WriteHeader(http.StatusPartialContent)
256                         data = data[b0 : b1+1]
257                 }
258                 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
259                 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
260                 if r.Method == "GET" {
261                         if _, err := rw.Write(data); err != nil {
262                                 log.Printf("write %+q: %s", data, err)
263                         }
264                 }
265                 h.unlockAndRace()
266         case r.Method == "DELETE" && hash != "":
267                 // "Delete Blob" API
268                 if !blobExists {
269                         rw.WriteHeader(http.StatusNotFound)
270                         return
271                 }
272                 delete(h.blobs, container+"|"+hash)
273                 rw.WriteHeader(http.StatusAccepted)
274         case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
275                 // "List Blobs" API
276                 prefix := container + "|" + r.Form.Get("prefix")
277                 marker := r.Form.Get("marker")
278
279                 maxResults := 2
280                 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
281                         maxResults = n
282                 }
283
284                 resp := storage.BlobListResponse{
285                         Marker:     marker,
286                         NextMarker: "",
287                         MaxResults: int64(maxResults),
288                 }
289                 var hashes sort.StringSlice
290                 for k := range h.blobs {
291                         if strings.HasPrefix(k, prefix) {
292                                 hashes = append(hashes, k[len(container)+1:])
293                         }
294                 }
295                 hashes.Sort()
296                 for _, hash := range hashes {
297                         if len(resp.Blobs) == maxResults {
298                                 resp.NextMarker = hash
299                                 break
300                         }
301                         if len(resp.Blobs) > 0 || marker == "" || marker == hash {
302                                 blob := h.blobs[container+"|"+hash]
303                                 bmeta := map[string]string(nil)
304                                 if r.Form.Get("include") == "metadata" {
305                                         bmeta = blob.Metadata
306                                 }
307                                 b := storage.Blob{
308                                         Name: hash,
309                                         Properties: storage.BlobProperties{
310                                                 LastModified:  storage.TimeRFC1123(blob.Mtime),
311                                                 ContentLength: int64(len(blob.Data)),
312                                                 Etag:          blob.Etag,
313                                         },
314                                         Metadata: bmeta,
315                                 }
316                                 resp.Blobs = append(resp.Blobs, b)
317                         }
318                 }
319                 buf, err := xml.Marshal(resp)
320                 if err != nil {
321                         log.Print(err)
322                         rw.WriteHeader(http.StatusInternalServerError)
323                 }
324                 rw.Write(buf)
325         default:
326                 log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
327                 rw.WriteHeader(http.StatusNotImplemented)
328         }
329 }
330
331 // azStubDialer is a net.Dialer that notices when the Azure driver
332 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
333 // in such cases transparently dials "127.0.0.1:46067" instead.
334 type azStubDialer struct {
335         net.Dialer
336 }
337
338 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
339
340 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
341         if hp := localHostPortRe.FindString(address); hp != "" {
342                 log.Println("azStubDialer: dial", hp, "instead of", address)
343                 address = hp
344         }
345         return d.Dialer.Dial(network, address)
346 }
347
348 type TestableAzureBlobVolume struct {
349         *AzureBlobVolume
350         azHandler *azStubHandler
351         azStub    *httptest.Server
352         t         TB
353 }
354
355 func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
356         azHandler := newAzStubHandler()
357         azStub := httptest.NewServer(azHandler)
358
359         var azClient storage.Client
360
361         container := azureTestContainer
362         if container == "" {
363                 // Connect to stub instead of real Azure storage service
364                 stubURLBase := strings.Split(azStub.URL, "://")[1]
365                 var err error
366                 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
367                         t.Fatal(err)
368                 }
369                 container = "fakecontainername"
370         } else {
371                 // Connect to real Azure storage service
372                 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
373                 if err != nil {
374                         t.Fatal(err)
375                 }
376                 azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
377                 if err != nil {
378                         t.Fatal(err)
379                 }
380         }
381
382         bs := azClient.GetBlobService()
383         v := &AzureBlobVolume{
384                 ContainerName:    container,
385                 ReadOnly:         readonly,
386                 AzureReplication: replication,
387                 azClient:         azClient,
388                 container:        &azureContainer{ctr: bs.GetContainerReference(container)},
389         }
390
391         return &TestableAzureBlobVolume{
392                 AzureBlobVolume: v,
393                 azHandler:       azHandler,
394                 azStub:          azStub,
395                 t:               t,
396         }
397 }
398
399 var _ = check.Suite(&StubbedAzureBlobSuite{})
400
401 type StubbedAzureBlobSuite struct {
402         volume            *TestableAzureBlobVolume
403         origHTTPTransport http.RoundTripper
404 }
405
406 func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
407         s.origHTTPTransport = http.DefaultTransport
408         http.DefaultTransport = &http.Transport{
409                 Dial: (&azStubDialer{}).Dial,
410         }
411         azureWriteRaceInterval = time.Millisecond
412         azureWriteRacePollTime = time.Nanosecond
413
414         s.volume = NewTestableAzureBlobVolume(c, false, 3)
415 }
416
417 func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
418         s.volume.Teardown()
419         http.DefaultTransport = s.origHTTPTransport
420 }
421
422 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
423         defer func(t http.RoundTripper) {
424                 http.DefaultTransport = t
425         }(http.DefaultTransport)
426         http.DefaultTransport = &http.Transport{
427                 Dial: (&azStubDialer{}).Dial,
428         }
429         azureWriteRaceInterval = time.Millisecond
430         azureWriteRacePollTime = time.Nanosecond
431         DoGenericVolumeTests(t, func(t TB) TestableVolume {
432                 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
433         })
434 }
435
436 func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
437         defer func(b int) {
438                 azureMaxGetBytes = b
439         }(azureMaxGetBytes)
440
441         defer func(t http.RoundTripper) {
442                 http.DefaultTransport = t
443         }(http.DefaultTransport)
444         http.DefaultTransport = &http.Transport{
445                 Dial: (&azStubDialer{}).Dial,
446         }
447         azureWriteRaceInterval = time.Millisecond
448         azureWriteRacePollTime = time.Nanosecond
449         // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
450         for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
451                 DoGenericVolumeTests(t, func(t TB) TestableVolume {
452                         return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
453                 })
454         }
455 }
456
457 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
458         defer func(t http.RoundTripper) {
459                 http.DefaultTransport = t
460         }(http.DefaultTransport)
461         http.DefaultTransport = &http.Transport{
462                 Dial: (&azStubDialer{}).Dial,
463         }
464         azureWriteRaceInterval = time.Millisecond
465         azureWriteRacePollTime = time.Nanosecond
466         DoGenericVolumeTests(t, func(t TB) TestableVolume {
467                 return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
468         })
469 }
470
471 func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
472         defer func(t http.RoundTripper) {
473                 http.DefaultTransport = t
474         }(http.DefaultTransport)
475         http.DefaultTransport = &http.Transport{
476                 Dial: (&azStubDialer{}).Dial,
477         }
478
479         v := NewTestableAzureBlobVolume(t, false, 3)
480         defer v.Teardown()
481
482         for _, size := range []int{
483                 2<<22 - 1, // one <max read
484                 2 << 22,   // one =max read
485                 2<<22 + 1, // one =max read, one <max
486                 2 << 23,   // two =max reads
487                 BlockSize - 1,
488                 BlockSize,
489         } {
490                 data := make([]byte, size)
491                 for i := range data {
492                         data[i] = byte((i + 7) & 0xff)
493                 }
494                 hash := fmt.Sprintf("%x", md5.Sum(data))
495                 err := v.Put(context.Background(), hash, data)
496                 if err != nil {
497                         t.Error(err)
498                 }
499                 gotData := make([]byte, len(data))
500                 gotLen, err := v.Get(context.Background(), hash, gotData)
501                 if err != nil {
502                         t.Error(err)
503                 }
504                 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
505                 if gotLen != size {
506                         t.Errorf("length mismatch: got %d != %d", gotLen, size)
507                 }
508                 if gotHash != hash {
509                         t.Errorf("hash mismatch: got %s != %s", gotHash, hash)
510                 }
511         }
512 }
513
514 func TestAzureBlobVolumeReplication(t *testing.T) {
515         for r := 1; r <= 4; r++ {
516                 v := NewTestableAzureBlobVolume(t, false, r)
517                 defer v.Teardown()
518                 if n := v.Replication(); n != r {
519                         t.Errorf("Got replication %d, expected %d", n, r)
520                 }
521         }
522 }
523
524 func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
525         defer func(t http.RoundTripper) {
526                 http.DefaultTransport = t
527         }(http.DefaultTransport)
528         http.DefaultTransport = &http.Transport{
529                 Dial: (&azStubDialer{}).Dial,
530         }
531
532         v := NewTestableAzureBlobVolume(t, false, 3)
533         defer v.Teardown()
534
535         azureWriteRaceInterval = time.Second
536         azureWriteRacePollTime = time.Millisecond
537
538         allDone := make(chan struct{})
539         v.azHandler.race = make(chan chan struct{})
540         go func() {
541                 err := v.Put(context.Background(), TestHash, TestBlock)
542                 if err != nil {
543                         t.Error(err)
544                 }
545         }()
546         continuePut := make(chan struct{})
547         // Wait for the stub's Put to create the empty blob
548         v.azHandler.race <- continuePut
549         go func() {
550                 buf := make([]byte, len(TestBlock))
551                 _, err := v.Get(context.Background(), TestHash, buf)
552                 if err != nil {
553                         t.Error(err)
554                 }
555                 close(allDone)
556         }()
557         // Wait for the stub's Get to get the empty blob
558         close(v.azHandler.race)
559         // Allow stub's Put to continue, so the real data is ready
560         // when the volume's Get retries
561         <-continuePut
562         // Wait for volume's Get to return the real data
563         <-allDone
564 }
565
566 func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
567         defer func(t http.RoundTripper) {
568                 http.DefaultTransport = t
569         }(http.DefaultTransport)
570         http.DefaultTransport = &http.Transport{
571                 Dial: (&azStubDialer{}).Dial,
572         }
573
574         v := NewTestableAzureBlobVolume(t, false, 3)
575         defer v.Teardown()
576
577         azureWriteRaceInterval = 2 * time.Second
578         azureWriteRacePollTime = 5 * time.Millisecond
579
580         v.PutRaw(TestHash, nil)
581
582         buf := new(bytes.Buffer)
583         v.IndexTo("", buf)
584         if buf.Len() != 0 {
585                 t.Errorf("Index %+q should be empty", buf.Bytes())
586         }
587
588         v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
589
590         allDone := make(chan struct{})
591         go func() {
592                 defer close(allDone)
593                 buf := make([]byte, BlockSize)
594                 n, err := v.Get(context.Background(), TestHash, buf)
595                 if err != nil {
596                         t.Error(err)
597                         return
598                 }
599                 if n != 0 {
600                         t.Errorf("Got %+q, expected empty buf", buf[:n])
601                 }
602         }()
603         select {
604         case <-allDone:
605         case <-time.After(time.Second):
606                 t.Error("Get should have stopped waiting for race when block was 2s old")
607         }
608
609         buf.Reset()
610         v.IndexTo("", buf)
611         if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
612                 t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
613         }
614 }
615
616 func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
617         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
618                 v.PutRaw(TestHash, TestBlock)
619                 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
620                 return err
621         })
622 }
623
624 func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
625         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
626                 return v.Put(ctx, TestHash, make([]byte, BlockSize))
627         })
628 }
629
630 func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
631         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
632                 v.PutRaw(TestHash, TestBlock)
633                 return v.Compare(ctx, TestHash, TestBlock2)
634         })
635 }
636
637 func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
638         defer func(t http.RoundTripper) {
639                 http.DefaultTransport = t
640         }(http.DefaultTransport)
641         http.DefaultTransport = &http.Transport{
642                 Dial: (&azStubDialer{}).Dial,
643         }
644
645         v := NewTestableAzureBlobVolume(t, false, 3)
646         defer v.Teardown()
647         v.azHandler.race = make(chan chan struct{})
648
649         ctx, cancel := context.WithCancel(context.Background())
650         allDone := make(chan struct{})
651         go func() {
652                 defer close(allDone)
653                 err := testFunc(ctx, v)
654                 if err != context.Canceled {
655                         t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
656                 }
657         }()
658         releaseHandler := make(chan struct{})
659         select {
660         case <-allDone:
661                 t.Error("testFunc finished without waiting for v.azHandler.race")
662         case <-time.After(10 * time.Second):
663                 t.Error("timed out waiting to enter handler")
664         case v.azHandler.race <- releaseHandler:
665         }
666
667         cancel()
668
669         select {
670         case <-time.After(10 * time.Second):
671                 t.Error("timed out waiting to cancel")
672         case <-allDone:
673         }
674
675         go func() {
676                 <-releaseHandler
677         }()
678 }
679
680 func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
681         stats := func() string {
682                 buf, err := json.Marshal(s.volume.InternalStats())
683                 c.Check(err, check.IsNil)
684                 return string(buf)
685         }
686
687         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
688         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
689
690         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
691         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
692         c.Check(err, check.NotNil)
693         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
694         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
695         c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
696         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
697
698         err = s.volume.Put(context.Background(), loc, []byte("foo"))
699         c.Check(err, check.IsNil)
700         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
701         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
702
703         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
704         c.Check(err, check.IsNil)
705         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
706         c.Check(err, check.IsNil)
707         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
708 }
709
710 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
711         v.azHandler.PutRaw(v.ContainerName, locator, data)
712 }
713
714 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
715         v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
716 }
717
718 func (v *TestableAzureBlobVolume) Teardown() {
719         v.azStub.Close()
720 }
721
722 func makeEtag() string {
723         return fmt.Sprintf("0x%x", rand.Int63())
724 }