Merge branch 'thehyve/fix-crunch-documentation' Fix a typo in Crunch Dispatch install...
[arvados.git] / services / keepstore / azure_blob_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/base64"
12         "encoding/json"
13         "encoding/xml"
14         "flag"
15         "fmt"
16         "io/ioutil"
17         "math/rand"
18         "net"
19         "net/http"
20         "net/http/httptest"
21         "regexp"
22         "sort"
23         "strconv"
24         "strings"
25         "sync"
26         "testing"
27         "time"
28
29         "github.com/Azure/azure-sdk-for-go/storage"
30         "github.com/ghodss/yaml"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         // This cannot be the fake account name "devstoreaccount1"
36         // used by Microsoft's Azure emulator: the Azure SDK
37         // recognizes that magic string and changes its behavior to
38         // cater to the Azure SDK's own test suite.
39         fakeAccountName = "fakeaccountname"
40         fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
41 )
42
43 var azureTestContainer string
44
45 func init() {
46         flag.StringVar(
47                 &azureTestContainer,
48                 "test.azure-storage-container-volume",
49                 "",
50                 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
51 }
52
53 type azBlob struct {
54         Data        []byte
55         Etag        string
56         Metadata    map[string]string
57         Mtime       time.Time
58         Uncommitted map[string][]byte
59 }
60
61 type azStubHandler struct {
62         sync.Mutex
63         blobs map[string]*azBlob
64         race  chan chan struct{}
65 }
66
67 func newAzStubHandler() *azStubHandler {
68         return &azStubHandler{
69                 blobs: make(map[string]*azBlob),
70         }
71 }
72
73 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
74         blob, ok := h.blobs[container+"|"+hash]
75         if !ok {
76                 return
77         }
78         blob.Mtime = t
79 }
80
81 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
82         h.Lock()
83         defer h.Unlock()
84         h.blobs[container+"|"+hash] = &azBlob{
85                 Data:        data,
86                 Mtime:       time.Now(),
87                 Metadata:    make(map[string]string),
88                 Uncommitted: make(map[string][]byte),
89         }
90 }
91
92 func (h *azStubHandler) unlockAndRace() {
93         if h.race == nil {
94                 return
95         }
96         h.Unlock()
97         // Signal caller that race is starting by reading from
98         // h.race. If we get a channel, block until that channel is
99         // ready to receive. If we get nil (or h.race is closed) just
100         // proceed.
101         if c := <-h.race; c != nil {
102                 c <- struct{}{}
103         }
104         h.Lock()
105 }
106
107 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
108
109 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
110         h.Lock()
111         defer h.Unlock()
112         // defer log.Printf("azStubHandler: %+v", r)
113
114         path := strings.Split(r.URL.Path, "/")
115         container := path[1]
116         hash := ""
117         if len(path) > 2 {
118                 hash = path[2]
119         }
120
121         if err := r.ParseForm(); err != nil {
122                 log.Printf("azStubHandler(%+v): %s", r, err)
123                 rw.WriteHeader(http.StatusBadRequest)
124                 return
125         }
126
127         if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
128                 rw.WriteHeader(http.StatusLengthRequired)
129                 return
130         }
131
132         body, err := ioutil.ReadAll(r.Body)
133         if err != nil {
134                 return
135         }
136
137         type blockListRequestBody struct {
138                 XMLName     xml.Name `xml:"BlockList"`
139                 Uncommitted []string
140         }
141
142         blob, blobExists := h.blobs[container+"|"+hash]
143
144         switch {
145         case r.Method == "PUT" && r.Form.Get("comp") == "":
146                 // "Put Blob" API
147                 if _, ok := h.blobs[container+"|"+hash]; !ok {
148                         // Like the real Azure service, we offer a
149                         // race window during which other clients can
150                         // list/get the new blob before any data is
151                         // committed.
152                         h.blobs[container+"|"+hash] = &azBlob{
153                                 Mtime:       time.Now(),
154                                 Uncommitted: make(map[string][]byte),
155                                 Metadata:    make(map[string]string),
156                                 Etag:        makeEtag(),
157                         }
158                         h.unlockAndRace()
159                 }
160                 metadata := make(map[string]string)
161                 for k, v := range r.Header {
162                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
163                                 name := k[len("x-ms-meta-"):]
164                                 metadata[strings.ToLower(name)] = v[0]
165                         }
166                 }
167                 h.blobs[container+"|"+hash] = &azBlob{
168                         Data:        body,
169                         Mtime:       time.Now(),
170                         Uncommitted: make(map[string][]byte),
171                         Metadata:    metadata,
172                         Etag:        makeEtag(),
173                 }
174                 rw.WriteHeader(http.StatusCreated)
175         case r.Method == "PUT" && r.Form.Get("comp") == "block":
176                 // "Put Block" API
177                 if !blobExists {
178                         log.Printf("Got block for nonexistent blob: %+v", r)
179                         rw.WriteHeader(http.StatusBadRequest)
180                         return
181                 }
182                 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
183                 if err != nil || len(blockID) == 0 {
184                         log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
185                         rw.WriteHeader(http.StatusBadRequest)
186                         return
187                 }
188                 blob.Uncommitted[string(blockID)] = body
189                 rw.WriteHeader(http.StatusCreated)
190         case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
191                 // "Put Block List" API
192                 bl := &blockListRequestBody{}
193                 if err := xml.Unmarshal(body, bl); err != nil {
194                         log.Printf("xml Unmarshal: %s", err)
195                         rw.WriteHeader(http.StatusBadRequest)
196                         return
197                 }
198                 for _, encBlockID := range bl.Uncommitted {
199                         blockID, err := base64.StdEncoding.DecodeString(encBlockID)
200                         if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
201                                 log.Printf("Invalid blockid: %+q", encBlockID)
202                                 rw.WriteHeader(http.StatusBadRequest)
203                                 return
204                         }
205                         blob.Data = blob.Uncommitted[string(blockID)]
206                         blob.Etag = makeEtag()
207                         blob.Mtime = time.Now()
208                         delete(blob.Uncommitted, string(blockID))
209                 }
210                 rw.WriteHeader(http.StatusCreated)
211         case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
212                 // "Set Metadata Headers" API. We don't bother
213                 // stubbing "Get Metadata Headers": AzureBlobVolume
214                 // sets metadata headers only as a way to bump Etag
215                 // and Last-Modified.
216                 if !blobExists {
217                         log.Printf("Got metadata for nonexistent blob: %+v", r)
218                         rw.WriteHeader(http.StatusBadRequest)
219                         return
220                 }
221                 blob.Metadata = make(map[string]string)
222                 for k, v := range r.Header {
223                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
224                                 name := k[len("x-ms-meta-"):]
225                                 blob.Metadata[strings.ToLower(name)] = v[0]
226                         }
227                 }
228                 blob.Mtime = time.Now()
229                 blob.Etag = makeEtag()
230         case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
231                 // "Get Blob Metadata" API
232                 if !blobExists {
233                         rw.WriteHeader(http.StatusNotFound)
234                         return
235                 }
236                 for k, v := range blob.Metadata {
237                         rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
238                 }
239                 return
240         case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
241                 // "Get Blob" API
242                 if !blobExists {
243                         rw.WriteHeader(http.StatusNotFound)
244                         return
245                 }
246                 data := blob.Data
247                 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
248                         b0, err0 := strconv.Atoi(rangeSpec[1])
249                         b1, err1 := strconv.Atoi(rangeSpec[2])
250                         if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
251                                 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
252                                 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
253                                 return
254                         }
255                         rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
256                         rw.WriteHeader(http.StatusPartialContent)
257                         data = data[b0 : b1+1]
258                 }
259                 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
260                 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
261                 if r.Method == "GET" {
262                         if _, err := rw.Write(data); err != nil {
263                                 log.Printf("write %+q: %s", data, err)
264                         }
265                 }
266                 h.unlockAndRace()
267         case r.Method == "DELETE" && hash != "":
268                 // "Delete Blob" API
269                 if !blobExists {
270                         rw.WriteHeader(http.StatusNotFound)
271                         return
272                 }
273                 delete(h.blobs, container+"|"+hash)
274                 rw.WriteHeader(http.StatusAccepted)
275         case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
276                 // "List Blobs" API
277                 prefix := container + "|" + r.Form.Get("prefix")
278                 marker := r.Form.Get("marker")
279
280                 maxResults := 2
281                 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
282                         maxResults = n
283                 }
284
285                 resp := storage.BlobListResponse{
286                         Marker:     marker,
287                         NextMarker: "",
288                         MaxResults: int64(maxResults),
289                 }
290                 var hashes sort.StringSlice
291                 for k := range h.blobs {
292                         if strings.HasPrefix(k, prefix) {
293                                 hashes = append(hashes, k[len(container)+1:])
294                         }
295                 }
296                 hashes.Sort()
297                 for _, hash := range hashes {
298                         if len(resp.Blobs) == maxResults {
299                                 resp.NextMarker = hash
300                                 break
301                         }
302                         if len(resp.Blobs) > 0 || marker == "" || marker == hash {
303                                 blob := h.blobs[container+"|"+hash]
304                                 bmeta := map[string]string(nil)
305                                 if r.Form.Get("include") == "metadata" {
306                                         bmeta = blob.Metadata
307                                 }
308                                 b := storage.Blob{
309                                         Name: hash,
310                                         Properties: storage.BlobProperties{
311                                                 LastModified:  storage.TimeRFC1123(blob.Mtime),
312                                                 ContentLength: int64(len(blob.Data)),
313                                                 Etag:          blob.Etag,
314                                         },
315                                         Metadata: bmeta,
316                                 }
317                                 resp.Blobs = append(resp.Blobs, b)
318                         }
319                 }
320                 buf, err := xml.Marshal(resp)
321                 if err != nil {
322                         log.Print(err)
323                         rw.WriteHeader(http.StatusInternalServerError)
324                 }
325                 rw.Write(buf)
326         default:
327                 log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
328                 rw.WriteHeader(http.StatusNotImplemented)
329         }
330 }
331
332 // azStubDialer is a net.Dialer that notices when the Azure driver
333 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
334 // in such cases transparently dials "127.0.0.1:46067" instead.
335 type azStubDialer struct {
336         net.Dialer
337 }
338
339 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
340
341 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
342         if hp := localHostPortRe.FindString(address); hp != "" {
343                 log.Println("azStubDialer: dial", hp, "instead of", address)
344                 address = hp
345         }
346         return d.Dialer.Dial(network, address)
347 }
348
349 type TestableAzureBlobVolume struct {
350         *AzureBlobVolume
351         azHandler *azStubHandler
352         azStub    *httptest.Server
353         t         TB
354 }
355
356 func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
357         azHandler := newAzStubHandler()
358         azStub := httptest.NewServer(azHandler)
359
360         var azClient storage.Client
361
362         container := azureTestContainer
363         if container == "" {
364                 // Connect to stub instead of real Azure storage service
365                 stubURLBase := strings.Split(azStub.URL, "://")[1]
366                 var err error
367                 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
368                         t.Fatal(err)
369                 }
370                 container = "fakecontainername"
371         } else {
372                 // Connect to real Azure storage service
373                 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
374                 if err != nil {
375                         t.Fatal(err)
376                 }
377                 azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
378                 if err != nil {
379                         t.Fatal(err)
380                 }
381         }
382
383         bs := azClient.GetBlobService()
384         v := &AzureBlobVolume{
385                 ContainerName:    container,
386                 ReadOnly:         readonly,
387                 AzureReplication: replication,
388                 azClient:         azClient,
389                 container:        &azureContainer{ctr: bs.GetContainerReference(container)},
390         }
391
392         return &TestableAzureBlobVolume{
393                 AzureBlobVolume: v,
394                 azHandler:       azHandler,
395                 azStub:          azStub,
396                 t:               t,
397         }
398 }
399
400 var _ = check.Suite(&StubbedAzureBlobSuite{})
401
402 type StubbedAzureBlobSuite struct {
403         volume            *TestableAzureBlobVolume
404         origHTTPTransport http.RoundTripper
405 }
406
407 func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
408         s.origHTTPTransport = http.DefaultTransport
409         http.DefaultTransport = &http.Transport{
410                 Dial: (&azStubDialer{}).Dial,
411         }
412         azureWriteRaceInterval = time.Millisecond
413         azureWriteRacePollTime = time.Nanosecond
414
415         s.volume = NewTestableAzureBlobVolume(c, false, 3)
416 }
417
418 func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
419         s.volume.Teardown()
420         http.DefaultTransport = s.origHTTPTransport
421 }
422
423 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
424         defer func(t http.RoundTripper) {
425                 http.DefaultTransport = t
426         }(http.DefaultTransport)
427         http.DefaultTransport = &http.Transport{
428                 Dial: (&azStubDialer{}).Dial,
429         }
430         azureWriteRaceInterval = time.Millisecond
431         azureWriteRacePollTime = time.Nanosecond
432         DoGenericVolumeTests(t, func(t TB) TestableVolume {
433                 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
434         })
435 }
436
437 func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
438         defer func(b int) {
439                 azureMaxGetBytes = b
440         }(azureMaxGetBytes)
441
442         defer func(t http.RoundTripper) {
443                 http.DefaultTransport = t
444         }(http.DefaultTransport)
445         http.DefaultTransport = &http.Transport{
446                 Dial: (&azStubDialer{}).Dial,
447         }
448         azureWriteRaceInterval = time.Millisecond
449         azureWriteRacePollTime = time.Nanosecond
450         // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
451         for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
452                 DoGenericVolumeTests(t, func(t TB) TestableVolume {
453                         return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
454                 })
455         }
456 }
457
458 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
459         defer func(t http.RoundTripper) {
460                 http.DefaultTransport = t
461         }(http.DefaultTransport)
462         http.DefaultTransport = &http.Transport{
463                 Dial: (&azStubDialer{}).Dial,
464         }
465         azureWriteRaceInterval = time.Millisecond
466         azureWriteRacePollTime = time.Nanosecond
467         DoGenericVolumeTests(t, func(t TB) TestableVolume {
468                 return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
469         })
470 }
471
472 func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
473         defer func(t http.RoundTripper) {
474                 http.DefaultTransport = t
475         }(http.DefaultTransport)
476         http.DefaultTransport = &http.Transport{
477                 Dial: (&azStubDialer{}).Dial,
478         }
479
480         v := NewTestableAzureBlobVolume(t, false, 3)
481         defer v.Teardown()
482
483         for _, size := range []int{
484                 2<<22 - 1, // one <max read
485                 2 << 22,   // one =max read
486                 2<<22 + 1, // one =max read, one <max
487                 2 << 23,   // two =max reads
488                 BlockSize - 1,
489                 BlockSize,
490         } {
491                 data := make([]byte, size)
492                 for i := range data {
493                         data[i] = byte((i + 7) & 0xff)
494                 }
495                 hash := fmt.Sprintf("%x", md5.Sum(data))
496                 err := v.Put(context.Background(), hash, data)
497                 if err != nil {
498                         t.Error(err)
499                 }
500                 gotData := make([]byte, len(data))
501                 gotLen, err := v.Get(context.Background(), hash, gotData)
502                 if err != nil {
503                         t.Error(err)
504                 }
505                 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
506                 if gotLen != size {
507                         t.Errorf("length mismatch: got %d != %d", gotLen, size)
508                 }
509                 if gotHash != hash {
510                         t.Errorf("hash mismatch: got %s != %s", gotHash, hash)
511                 }
512         }
513 }
514
515 func TestAzureBlobVolumeReplication(t *testing.T) {
516         for r := 1; r <= 4; r++ {
517                 v := NewTestableAzureBlobVolume(t, false, r)
518                 defer v.Teardown()
519                 if n := v.Replication(); n != r {
520                         t.Errorf("Got replication %d, expected %d", n, r)
521                 }
522         }
523 }
524
525 func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
526         defer func(t http.RoundTripper) {
527                 http.DefaultTransport = t
528         }(http.DefaultTransport)
529         http.DefaultTransport = &http.Transport{
530                 Dial: (&azStubDialer{}).Dial,
531         }
532
533         v := NewTestableAzureBlobVolume(t, false, 3)
534         defer v.Teardown()
535
536         azureWriteRaceInterval = time.Second
537         azureWriteRacePollTime = time.Millisecond
538
539         var wg sync.WaitGroup
540
541         v.azHandler.race = make(chan chan struct{})
542
543         wg.Add(1)
544         go func() {
545                 defer wg.Done()
546                 err := v.Put(context.Background(), TestHash, TestBlock)
547                 if err != nil {
548                         t.Error(err)
549                 }
550         }()
551         continuePut := make(chan struct{})
552         // Wait for the stub's Put to create the empty blob
553         v.azHandler.race <- continuePut
554         wg.Add(1)
555         go func() {
556                 defer wg.Done()
557                 buf := make([]byte, len(TestBlock))
558                 _, err := v.Get(context.Background(), TestHash, buf)
559                 if err != nil {
560                         t.Error(err)
561                 }
562         }()
563         // Wait for the stub's Get to get the empty blob
564         close(v.azHandler.race)
565         // Allow stub's Put to continue, so the real data is ready
566         // when the volume's Get retries
567         <-continuePut
568         // Wait for Get() and Put() to finish
569         wg.Wait()
570 }
571
572 func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
573         defer func(t http.RoundTripper) {
574                 http.DefaultTransport = t
575         }(http.DefaultTransport)
576         http.DefaultTransport = &http.Transport{
577                 Dial: (&azStubDialer{}).Dial,
578         }
579
580         v := NewTestableAzureBlobVolume(t, false, 3)
581         defer v.Teardown()
582
583         azureWriteRaceInterval = 2 * time.Second
584         azureWriteRacePollTime = 5 * time.Millisecond
585
586         v.PutRaw(TestHash, nil)
587
588         buf := new(bytes.Buffer)
589         v.IndexTo("", buf)
590         if buf.Len() != 0 {
591                 t.Errorf("Index %+q should be empty", buf.Bytes())
592         }
593
594         v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
595
596         allDone := make(chan struct{})
597         go func() {
598                 defer close(allDone)
599                 buf := make([]byte, BlockSize)
600                 n, err := v.Get(context.Background(), TestHash, buf)
601                 if err != nil {
602                         t.Error(err)
603                         return
604                 }
605                 if n != 0 {
606                         t.Errorf("Got %+q, expected empty buf", buf[:n])
607                 }
608         }()
609         select {
610         case <-allDone:
611         case <-time.After(time.Second):
612                 t.Error("Get should have stopped waiting for race when block was 2s old")
613         }
614
615         buf.Reset()
616         v.IndexTo("", buf)
617         if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
618                 t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
619         }
620 }
621
622 func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
623         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
624                 v.PutRaw(TestHash, TestBlock)
625                 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
626                 return err
627         })
628 }
629
630 func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
631         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
632                 return v.Put(ctx, TestHash, make([]byte, BlockSize))
633         })
634 }
635
636 func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
637         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
638                 v.PutRaw(TestHash, TestBlock)
639                 return v.Compare(ctx, TestHash, TestBlock2)
640         })
641 }
642
643 func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
644         defer func(t http.RoundTripper) {
645                 http.DefaultTransport = t
646         }(http.DefaultTransport)
647         http.DefaultTransport = &http.Transport{
648                 Dial: (&azStubDialer{}).Dial,
649         }
650
651         v := NewTestableAzureBlobVolume(t, false, 3)
652         defer v.Teardown()
653         v.azHandler.race = make(chan chan struct{})
654
655         ctx, cancel := context.WithCancel(context.Background())
656         allDone := make(chan struct{})
657         go func() {
658                 defer close(allDone)
659                 err := testFunc(ctx, v)
660                 if err != context.Canceled {
661                         t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
662                 }
663         }()
664         releaseHandler := make(chan struct{})
665         select {
666         case <-allDone:
667                 t.Error("testFunc finished without waiting for v.azHandler.race")
668         case <-time.After(10 * time.Second):
669                 t.Error("timed out waiting to enter handler")
670         case v.azHandler.race <- releaseHandler:
671         }
672
673         cancel()
674
675         select {
676         case <-time.After(10 * time.Second):
677                 t.Error("timed out waiting to cancel")
678         case <-allDone:
679         }
680
681         go func() {
682                 <-releaseHandler
683         }()
684 }
685
686 func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
687         stats := func() string {
688                 buf, err := json.Marshal(s.volume.InternalStats())
689                 c.Check(err, check.IsNil)
690                 return string(buf)
691         }
692
693         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
694         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
695
696         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
697         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
698         c.Check(err, check.NotNil)
699         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
700         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
701         c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
702         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
703
704         err = s.volume.Put(context.Background(), loc, []byte("foo"))
705         c.Check(err, check.IsNil)
706         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
707         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
708
709         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
710         c.Check(err, check.IsNil)
711         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
712         c.Check(err, check.IsNil)
713         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
714 }
715
716 func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
717         var cfg Config
718         err := yaml.Unmarshal([]byte(`
719 Volumes:
720   - Type: Azure
721     StorageClasses: ["class_a", "class_b"]
722 `), &cfg)
723
724         c.Check(err, check.IsNil)
725         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
726 }
727
728 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
729         v.azHandler.PutRaw(v.ContainerName, locator, data)
730 }
731
732 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
733         v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
734 }
735
736 func (v *TestableAzureBlobVolume) Teardown() {
737         v.azStub.Close()
738 }
739
740 func makeEtag() string {
741         return fmt.Sprintf("0x%x", rand.Int63())
742 }