Merge branch '11065-rotate-logs'
[arvados.git] / services / keepstore / azure_blob_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "crypto/md5"
7         "encoding/base64"
8         "encoding/json"
9         "encoding/xml"
10         "flag"
11         "fmt"
12         "io/ioutil"
13         "math/rand"
14         "net"
15         "net/http"
16         "net/http/httptest"
17         "regexp"
18         "sort"
19         "strconv"
20         "strings"
21         "sync"
22         "testing"
23         "time"
24
25         log "github.com/Sirupsen/logrus"
26         "github.com/curoverse/azure-sdk-for-go/storage"
27         check "gopkg.in/check.v1"
28 )
29
30 const (
31         // This cannot be the fake account name "devstoreaccount1"
32         // used by Microsoft's Azure emulator: the Azure SDK
33         // recognizes that magic string and changes its behavior to
34         // cater to the Azure SDK's own test suite.
35         fakeAccountName = "fakeAccountName"
36         fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
37 )
38
39 var azureTestContainer string
40
41 func init() {
42         flag.StringVar(
43                 &azureTestContainer,
44                 "test.azure-storage-container-volume",
45                 "",
46                 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
47 }
48
49 type azBlob struct {
50         Data        []byte
51         Etag        string
52         Metadata    map[string]string
53         Mtime       time.Time
54         Uncommitted map[string][]byte
55 }
56
57 type azStubHandler struct {
58         sync.Mutex
59         blobs map[string]*azBlob
60         race  chan chan struct{}
61 }
62
63 func newAzStubHandler() *azStubHandler {
64         return &azStubHandler{
65                 blobs: make(map[string]*azBlob),
66         }
67 }
68
69 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
70         blob, ok := h.blobs[container+"|"+hash]
71         if !ok {
72                 return
73         }
74         blob.Mtime = t
75 }
76
77 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
78         h.Lock()
79         defer h.Unlock()
80         h.blobs[container+"|"+hash] = &azBlob{
81                 Data:        data,
82                 Mtime:       time.Now(),
83                 Metadata:    make(map[string]string),
84                 Uncommitted: make(map[string][]byte),
85         }
86 }
87
88 func (h *azStubHandler) unlockAndRace() {
89         if h.race == nil {
90                 return
91         }
92         h.Unlock()
93         // Signal caller that race is starting by reading from
94         // h.race. If we get a channel, block until that channel is
95         // ready to receive. If we get nil (or h.race is closed) just
96         // proceed.
97         if c := <-h.race; c != nil {
98                 c <- struct{}{}
99         }
100         h.Lock()
101 }
102
103 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
104
105 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
106         h.Lock()
107         defer h.Unlock()
108         // defer log.Printf("azStubHandler: %+v", r)
109
110         path := strings.Split(r.URL.Path, "/")
111         container := path[1]
112         hash := ""
113         if len(path) > 2 {
114                 hash = path[2]
115         }
116
117         if err := r.ParseForm(); err != nil {
118                 log.Printf("azStubHandler(%+v): %s", r, err)
119                 rw.WriteHeader(http.StatusBadRequest)
120                 return
121         }
122
123         body, err := ioutil.ReadAll(r.Body)
124         if err != nil {
125                 return
126         }
127
128         type blockListRequestBody struct {
129                 XMLName     xml.Name `xml:"BlockList"`
130                 Uncommitted []string
131         }
132
133         blob, blobExists := h.blobs[container+"|"+hash]
134
135         switch {
136         case r.Method == "PUT" && r.Form.Get("comp") == "":
137                 // "Put Blob" API
138                 if _, ok := h.blobs[container+"|"+hash]; !ok {
139                         // Like the real Azure service, we offer a
140                         // race window during which other clients can
141                         // list/get the new blob before any data is
142                         // committed.
143                         h.blobs[container+"|"+hash] = &azBlob{
144                                 Mtime:       time.Now(),
145                                 Uncommitted: make(map[string][]byte),
146                                 Metadata:    make(map[string]string),
147                                 Etag:        makeEtag(),
148                         }
149                         h.unlockAndRace()
150                 }
151                 metadata := make(map[string]string)
152                 for k, v := range r.Header {
153                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
154                                 name := k[len("x-ms-meta-"):]
155                                 metadata[strings.ToLower(name)] = v[0]
156                         }
157                 }
158                 h.blobs[container+"|"+hash] = &azBlob{
159                         Data:        body,
160                         Mtime:       time.Now(),
161                         Uncommitted: make(map[string][]byte),
162                         Metadata:    metadata,
163                         Etag:        makeEtag(),
164                 }
165                 rw.WriteHeader(http.StatusCreated)
166         case r.Method == "PUT" && r.Form.Get("comp") == "block":
167                 // "Put Block" API
168                 if !blobExists {
169                         log.Printf("Got block for nonexistent blob: %+v", r)
170                         rw.WriteHeader(http.StatusBadRequest)
171                         return
172                 }
173                 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
174                 if err != nil || len(blockID) == 0 {
175                         log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
176                         rw.WriteHeader(http.StatusBadRequest)
177                         return
178                 }
179                 blob.Uncommitted[string(blockID)] = body
180                 rw.WriteHeader(http.StatusCreated)
181         case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
182                 // "Put Block List" API
183                 bl := &blockListRequestBody{}
184                 if err := xml.Unmarshal(body, bl); err != nil {
185                         log.Printf("xml Unmarshal: %s", err)
186                         rw.WriteHeader(http.StatusBadRequest)
187                         return
188                 }
189                 for _, encBlockID := range bl.Uncommitted {
190                         blockID, err := base64.StdEncoding.DecodeString(encBlockID)
191                         if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
192                                 log.Printf("Invalid blockid: %+q", encBlockID)
193                                 rw.WriteHeader(http.StatusBadRequest)
194                                 return
195                         }
196                         blob.Data = blob.Uncommitted[string(blockID)]
197                         blob.Etag = makeEtag()
198                         blob.Mtime = time.Now()
199                         delete(blob.Uncommitted, string(blockID))
200                 }
201                 rw.WriteHeader(http.StatusCreated)
202         case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
203                 // "Set Metadata Headers" API. We don't bother
204                 // stubbing "Get Metadata Headers": AzureBlobVolume
205                 // sets metadata headers only as a way to bump Etag
206                 // and Last-Modified.
207                 if !blobExists {
208                         log.Printf("Got metadata for nonexistent blob: %+v", r)
209                         rw.WriteHeader(http.StatusBadRequest)
210                         return
211                 }
212                 blob.Metadata = make(map[string]string)
213                 for k, v := range r.Header {
214                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
215                                 name := k[len("x-ms-meta-"):]
216                                 blob.Metadata[strings.ToLower(name)] = v[0]
217                         }
218                 }
219                 blob.Mtime = time.Now()
220                 blob.Etag = makeEtag()
221         case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
222                 // "Get Blob Metadata" API
223                 if !blobExists {
224                         rw.WriteHeader(http.StatusNotFound)
225                         return
226                 }
227                 for k, v := range blob.Metadata {
228                         rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
229                 }
230                 return
231         case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
232                 // "Get Blob" API
233                 if !blobExists {
234                         rw.WriteHeader(http.StatusNotFound)
235                         return
236                 }
237                 data := blob.Data
238                 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
239                         b0, err0 := strconv.Atoi(rangeSpec[1])
240                         b1, err1 := strconv.Atoi(rangeSpec[2])
241                         if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
242                                 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
243                                 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
244                                 return
245                         }
246                         rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
247                         rw.WriteHeader(http.StatusPartialContent)
248                         data = data[b0 : b1+1]
249                 }
250                 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
251                 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
252                 if r.Method == "GET" {
253                         if _, err := rw.Write(data); err != nil {
254                                 log.Printf("write %+q: %s", data, err)
255                         }
256                 }
257                 h.unlockAndRace()
258         case r.Method == "DELETE" && hash != "":
259                 // "Delete Blob" API
260                 if !blobExists {
261                         rw.WriteHeader(http.StatusNotFound)
262                         return
263                 }
264                 delete(h.blobs, container+"|"+hash)
265                 rw.WriteHeader(http.StatusAccepted)
266         case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
267                 // "List Blobs" API
268                 prefix := container + "|" + r.Form.Get("prefix")
269                 marker := r.Form.Get("marker")
270
271                 maxResults := 2
272                 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
273                         maxResults = n
274                 }
275
276                 resp := storage.BlobListResponse{
277                         Marker:     marker,
278                         NextMarker: "",
279                         MaxResults: int64(maxResults),
280                 }
281                 var hashes sort.StringSlice
282                 for k := range h.blobs {
283                         if strings.HasPrefix(k, prefix) {
284                                 hashes = append(hashes, k[len(container)+1:])
285                         }
286                 }
287                 hashes.Sort()
288                 for _, hash := range hashes {
289                         if len(resp.Blobs) == maxResults {
290                                 resp.NextMarker = hash
291                                 break
292                         }
293                         if len(resp.Blobs) > 0 || marker == "" || marker == hash {
294                                 blob := h.blobs[container+"|"+hash]
295                                 bmeta := map[string]string(nil)
296                                 if r.Form.Get("include") == "metadata" {
297                                         bmeta = blob.Metadata
298                                 }
299                                 b := storage.Blob{
300                                         Name: hash,
301                                         Properties: storage.BlobProperties{
302                                                 LastModified:  blob.Mtime.Format(time.RFC1123),
303                                                 ContentLength: int64(len(blob.Data)),
304                                                 Etag:          blob.Etag,
305                                         },
306                                         Metadata: bmeta,
307                                 }
308                                 resp.Blobs = append(resp.Blobs, b)
309                         }
310                 }
311                 buf, err := xml.Marshal(resp)
312                 if err != nil {
313                         log.Print(err)
314                         rw.WriteHeader(http.StatusInternalServerError)
315                 }
316                 rw.Write(buf)
317         default:
318                 log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
319                 rw.WriteHeader(http.StatusNotImplemented)
320         }
321 }
322
323 // azStubDialer is a net.Dialer that notices when the Azure driver
324 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
325 // in such cases transparently dials "127.0.0.1:46067" instead.
326 type azStubDialer struct {
327         net.Dialer
328 }
329
330 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
331
332 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
333         if hp := localHostPortRe.FindString(address); hp != "" {
334                 log.Println("azStubDialer: dial", hp, "instead of", address)
335                 address = hp
336         }
337         return d.Dialer.Dial(network, address)
338 }
339
340 type TestableAzureBlobVolume struct {
341         *AzureBlobVolume
342         azHandler *azStubHandler
343         azStub    *httptest.Server
344         t         TB
345 }
346
347 func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
348         azHandler := newAzStubHandler()
349         azStub := httptest.NewServer(azHandler)
350
351         var azClient storage.Client
352
353         container := azureTestContainer
354         if container == "" {
355                 // Connect to stub instead of real Azure storage service
356                 stubURLBase := strings.Split(azStub.URL, "://")[1]
357                 var err error
358                 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
359                         t.Fatal(err)
360                 }
361                 container = "fakecontainername"
362         } else {
363                 // Connect to real Azure storage service
364                 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
365                 if err != nil {
366                         t.Fatal(err)
367                 }
368                 azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
369                 if err != nil {
370                         t.Fatal(err)
371                 }
372         }
373
374         bs := azClient.GetBlobService()
375         v := &AzureBlobVolume{
376                 ContainerName:    container,
377                 ReadOnly:         readonly,
378                 AzureReplication: replication,
379                 azClient:         azClient,
380                 bsClient:         &azureBlobClient{client: &bs},
381         }
382
383         return &TestableAzureBlobVolume{
384                 AzureBlobVolume: v,
385                 azHandler:       azHandler,
386                 azStub:          azStub,
387                 t:               t,
388         }
389 }
390
391 var _ = check.Suite(&StubbedAzureBlobSuite{})
392
393 type StubbedAzureBlobSuite struct {
394         volume            *TestableAzureBlobVolume
395         origHTTPTransport http.RoundTripper
396 }
397
398 func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
399         s.origHTTPTransport = http.DefaultTransport
400         http.DefaultTransport = &http.Transport{
401                 Dial: (&azStubDialer{}).Dial,
402         }
403         azureWriteRaceInterval = time.Millisecond
404         azureWriteRacePollTime = time.Nanosecond
405
406         s.volume = NewTestableAzureBlobVolume(c, false, 3)
407 }
408
409 func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
410         s.volume.Teardown()
411         http.DefaultTransport = s.origHTTPTransport
412 }
413
414 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
415         defer func(t http.RoundTripper) {
416                 http.DefaultTransport = t
417         }(http.DefaultTransport)
418         http.DefaultTransport = &http.Transport{
419                 Dial: (&azStubDialer{}).Dial,
420         }
421         azureWriteRaceInterval = time.Millisecond
422         azureWriteRacePollTime = time.Nanosecond
423         DoGenericVolumeTests(t, func(t TB) TestableVolume {
424                 return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
425         })
426 }
427
428 func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
429         defer func(b int) {
430                 azureMaxGetBytes = b
431         }(azureMaxGetBytes)
432
433         defer func(t http.RoundTripper) {
434                 http.DefaultTransport = t
435         }(http.DefaultTransport)
436         http.DefaultTransport = &http.Transport{
437                 Dial: (&azStubDialer{}).Dial,
438         }
439         azureWriteRaceInterval = time.Millisecond
440         azureWriteRacePollTime = time.Nanosecond
441         // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
442         for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
443                 DoGenericVolumeTests(t, func(t TB) TestableVolume {
444                         return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
445                 })
446         }
447 }
448
449 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
450         defer func(t http.RoundTripper) {
451                 http.DefaultTransport = t
452         }(http.DefaultTransport)
453         http.DefaultTransport = &http.Transport{
454                 Dial: (&azStubDialer{}).Dial,
455         }
456         azureWriteRaceInterval = time.Millisecond
457         azureWriteRacePollTime = time.Nanosecond
458         DoGenericVolumeTests(t, func(t TB) TestableVolume {
459                 return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
460         })
461 }
462
463 func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
464         defer func(t http.RoundTripper) {
465                 http.DefaultTransport = t
466         }(http.DefaultTransport)
467         http.DefaultTransport = &http.Transport{
468                 Dial: (&azStubDialer{}).Dial,
469         }
470
471         v := NewTestableAzureBlobVolume(t, false, 3)
472         defer v.Teardown()
473
474         for _, size := range []int{
475                 2<<22 - 1, // one <max read
476                 2 << 22,   // one =max read
477                 2<<22 + 1, // one =max read, one <max
478                 2 << 23,   // two =max reads
479                 BlockSize - 1,
480                 BlockSize,
481         } {
482                 data := make([]byte, size)
483                 for i := range data {
484                         data[i] = byte((i + 7) & 0xff)
485                 }
486                 hash := fmt.Sprintf("%x", md5.Sum(data))
487                 err := v.Put(context.Background(), hash, data)
488                 if err != nil {
489                         t.Error(err)
490                 }
491                 gotData := make([]byte, len(data))
492                 gotLen, err := v.Get(context.Background(), hash, gotData)
493                 if err != nil {
494                         t.Error(err)
495                 }
496                 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
497                 if gotLen != size {
498                         t.Errorf("length mismatch: got %d != %d", gotLen, size)
499                 }
500                 if gotHash != hash {
501                         t.Errorf("hash mismatch: got %s != %s", gotHash, hash)
502                 }
503         }
504 }
505
506 func TestAzureBlobVolumeReplication(t *testing.T) {
507         for r := 1; r <= 4; r++ {
508                 v := NewTestableAzureBlobVolume(t, false, r)
509                 defer v.Teardown()
510                 if n := v.Replication(); n != r {
511                         t.Errorf("Got replication %d, expected %d", n, r)
512                 }
513         }
514 }
515
516 func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
517         defer func(t http.RoundTripper) {
518                 http.DefaultTransport = t
519         }(http.DefaultTransport)
520         http.DefaultTransport = &http.Transport{
521                 Dial: (&azStubDialer{}).Dial,
522         }
523
524         v := NewTestableAzureBlobVolume(t, false, 3)
525         defer v.Teardown()
526
527         azureWriteRaceInterval = time.Second
528         azureWriteRacePollTime = time.Millisecond
529
530         allDone := make(chan struct{})
531         v.azHandler.race = make(chan chan struct{})
532         go func() {
533                 err := v.Put(context.Background(), TestHash, TestBlock)
534                 if err != nil {
535                         t.Error(err)
536                 }
537         }()
538         continuePut := make(chan struct{})
539         // Wait for the stub's Put to create the empty blob
540         v.azHandler.race <- continuePut
541         go func() {
542                 buf := make([]byte, len(TestBlock))
543                 _, err := v.Get(context.Background(), TestHash, buf)
544                 if err != nil {
545                         t.Error(err)
546                 }
547                 close(allDone)
548         }()
549         // Wait for the stub's Get to get the empty blob
550         close(v.azHandler.race)
551         // Allow stub's Put to continue, so the real data is ready
552         // when the volume's Get retries
553         <-continuePut
554         // Wait for volume's Get to return the real data
555         <-allDone
556 }
557
558 func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
559         defer func(t http.RoundTripper) {
560                 http.DefaultTransport = t
561         }(http.DefaultTransport)
562         http.DefaultTransport = &http.Transport{
563                 Dial: (&azStubDialer{}).Dial,
564         }
565
566         v := NewTestableAzureBlobVolume(t, false, 3)
567         defer v.Teardown()
568
569         azureWriteRaceInterval = 2 * time.Second
570         azureWriteRacePollTime = 5 * time.Millisecond
571
572         v.PutRaw(TestHash, nil)
573
574         buf := new(bytes.Buffer)
575         v.IndexTo("", buf)
576         if buf.Len() != 0 {
577                 t.Errorf("Index %+q should be empty", buf.Bytes())
578         }
579
580         v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
581
582         allDone := make(chan struct{})
583         go func() {
584                 defer close(allDone)
585                 buf := make([]byte, BlockSize)
586                 n, err := v.Get(context.Background(), TestHash, buf)
587                 if err != nil {
588                         t.Error(err)
589                         return
590                 }
591                 if n != 0 {
592                         t.Errorf("Got %+q, expected empty buf", buf[:n])
593                 }
594         }()
595         select {
596         case <-allDone:
597         case <-time.After(time.Second):
598                 t.Error("Get should have stopped waiting for race when block was 2s old")
599         }
600
601         buf.Reset()
602         v.IndexTo("", buf)
603         if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
604                 t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
605         }
606 }
607
608 func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
609         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
610                 v.PutRaw(TestHash, TestBlock)
611                 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
612                 return err
613         })
614 }
615
616 func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
617         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
618                 return v.Put(ctx, TestHash, make([]byte, BlockSize))
619         })
620 }
621
622 func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
623         testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
624                 v.PutRaw(TestHash, TestBlock)
625                 return v.Compare(ctx, TestHash, TestBlock2)
626         })
627 }
628
629 func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
630         defer func(t http.RoundTripper) {
631                 http.DefaultTransport = t
632         }(http.DefaultTransport)
633         http.DefaultTransport = &http.Transport{
634                 Dial: (&azStubDialer{}).Dial,
635         }
636
637         v := NewTestableAzureBlobVolume(t, false, 3)
638         defer v.Teardown()
639         v.azHandler.race = make(chan chan struct{})
640
641         ctx, cancel := context.WithCancel(context.Background())
642         allDone := make(chan struct{})
643         go func() {
644                 defer close(allDone)
645                 err := testFunc(ctx, v)
646                 if err != context.Canceled {
647                         t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
648                 }
649         }()
650         releaseHandler := make(chan struct{})
651         select {
652         case <-allDone:
653                 t.Error("testFunc finished without waiting for v.azHandler.race")
654         case <-time.After(10 * time.Second):
655                 t.Error("timed out waiting to enter handler")
656         case v.azHandler.race <- releaseHandler:
657         }
658
659         cancel()
660
661         select {
662         case <-time.After(10 * time.Second):
663                 t.Error("timed out waiting to cancel")
664         case <-allDone:
665         }
666
667         go func() {
668                 <-releaseHandler
669         }()
670 }
671
672 func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
673         stats := func() string {
674                 buf, err := json.Marshal(s.volume.InternalStats())
675                 c.Check(err, check.IsNil)
676                 return string(buf)
677         }
678
679         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
680         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
681
682         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
683         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
684         c.Check(err, check.NotNil)
685         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
686         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
687         c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
688         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
689
690         err = s.volume.Put(context.Background(), loc, []byte("foo"))
691         c.Check(err, check.IsNil)
692         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
693         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
694
695         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
696         c.Check(err, check.IsNil)
697         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
698         c.Check(err, check.IsNil)
699         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
700 }
701
702 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
703         v.azHandler.PutRaw(v.ContainerName, locator, data)
704 }
705
706 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
707         v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
708 }
709
710 func (v *TestableAzureBlobVolume) Teardown() {
711         v.azStub.Close()
712 }
713
714 func makeEtag() string {
715         return fmt.Sprintf("0x%x", rand.Int63())
716 }