18005: Fixes the bug.
[arvados.git] / services / keepstore / azure_blob_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/base64"
12         "encoding/json"
13         "encoding/xml"
14         "flag"
15         "fmt"
16         "io/ioutil"
17         "math/rand"
18         "net"
19         "net/http"
20         "net/http/httptest"
21         "os"
22         "regexp"
23         "sort"
24         "strconv"
25         "strings"
26         "sync"
27         "time"
28
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/ctxlog"
31         "github.com/Azure/azure-sdk-for-go/storage"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34         check "gopkg.in/check.v1"
35 )
36
37 const (
38         // This cannot be the fake account name "devstoreaccount1"
39         // used by Microsoft's Azure emulator: the Azure SDK
40         // recognizes that magic string and changes its behavior to
41         // cater to the Azure SDK's own test suite.
42         fakeAccountName = "fakeaccountname"
43         fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
44 )
45
46 var (
47         azureTestContainer string
48         azureTestDebug     = os.Getenv("ARVADOS_DEBUG") != ""
49 )
50
51 func init() {
52         flag.StringVar(
53                 &azureTestContainer,
54                 "test.azure-storage-container-volume",
55                 "",
56                 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
57 }
58
59 type azBlob struct {
60         Data        []byte
61         Etag        string
62         Metadata    map[string]string
63         Mtime       time.Time
64         Uncommitted map[string][]byte
65 }
66
67 type azStubHandler struct {
68         sync.Mutex
69         logger     logrus.FieldLogger
70         blobs      map[string]*azBlob
71         race       chan chan struct{}
72         didlist503 bool
73 }
74
75 func newAzStubHandler(c *check.C) *azStubHandler {
76         return &azStubHandler{
77                 blobs:  make(map[string]*azBlob),
78                 logger: ctxlog.TestLogger(c),
79         }
80 }
81
82 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
83         blob, ok := h.blobs[container+"|"+hash]
84         if !ok {
85                 return
86         }
87         blob.Mtime = t
88 }
89
90 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
91         h.Lock()
92         defer h.Unlock()
93         h.blobs[container+"|"+hash] = &azBlob{
94                 Data:        data,
95                 Mtime:       time.Now(),
96                 Metadata:    make(map[string]string),
97                 Uncommitted: make(map[string][]byte),
98         }
99 }
100
101 func (h *azStubHandler) unlockAndRace() {
102         if h.race == nil {
103                 return
104         }
105         h.Unlock()
106         // Signal caller that race is starting by reading from
107         // h.race. If we get a channel, block until that channel is
108         // ready to receive. If we get nil (or h.race is closed) just
109         // proceed.
110         if c := <-h.race; c != nil {
111                 c <- struct{}{}
112         }
113         h.Lock()
114 }
115
116 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
117
118 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
119         h.Lock()
120         defer h.Unlock()
121         if azureTestDebug {
122                 defer h.logger.Printf("azStubHandler: %+v", r)
123         }
124
125         path := strings.Split(r.URL.Path, "/")
126         container := path[1]
127         hash := ""
128         if len(path) > 2 {
129                 hash = path[2]
130         }
131
132         if err := r.ParseForm(); err != nil {
133                 h.logger.Printf("azStubHandler(%+v): %s", r, err)
134                 rw.WriteHeader(http.StatusBadRequest)
135                 return
136         }
137
138         if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
139                 rw.WriteHeader(http.StatusLengthRequired)
140                 return
141         }
142
143         body, err := ioutil.ReadAll(r.Body)
144         if err != nil {
145                 return
146         }
147
148         type blockListRequestBody struct {
149                 XMLName     xml.Name `xml:"BlockList"`
150                 Uncommitted []string
151         }
152
153         blob, blobExists := h.blobs[container+"|"+hash]
154
155         switch {
156         case r.Method == "PUT" && r.Form.Get("comp") == "":
157                 // "Put Blob" API
158                 if _, ok := h.blobs[container+"|"+hash]; !ok {
159                         // Like the real Azure service, we offer a
160                         // race window during which other clients can
161                         // list/get the new blob before any data is
162                         // committed.
163                         h.blobs[container+"|"+hash] = &azBlob{
164                                 Mtime:       time.Now(),
165                                 Uncommitted: make(map[string][]byte),
166                                 Metadata:    make(map[string]string),
167                                 Etag:        makeEtag(),
168                         }
169                         h.unlockAndRace()
170                 }
171                 metadata := make(map[string]string)
172                 for k, v := range r.Header {
173                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
174                                 name := k[len("x-ms-meta-"):]
175                                 metadata[strings.ToLower(name)] = v[0]
176                         }
177                 }
178                 h.blobs[container+"|"+hash] = &azBlob{
179                         Data:        body,
180                         Mtime:       time.Now(),
181                         Uncommitted: make(map[string][]byte),
182                         Metadata:    metadata,
183                         Etag:        makeEtag(),
184                 }
185                 rw.WriteHeader(http.StatusCreated)
186         case r.Method == "PUT" && r.Form.Get("comp") == "block":
187                 // "Put Block" API
188                 if !blobExists {
189                         h.logger.Printf("Got block for nonexistent blob: %+v", r)
190                         rw.WriteHeader(http.StatusBadRequest)
191                         return
192                 }
193                 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
194                 if err != nil || len(blockID) == 0 {
195                         h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
196                         rw.WriteHeader(http.StatusBadRequest)
197                         return
198                 }
199                 blob.Uncommitted[string(blockID)] = body
200                 rw.WriteHeader(http.StatusCreated)
201         case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
202                 // "Put Block List" API
203                 bl := &blockListRequestBody{}
204                 if err := xml.Unmarshal(body, bl); err != nil {
205                         h.logger.Printf("xml Unmarshal: %s", err)
206                         rw.WriteHeader(http.StatusBadRequest)
207                         return
208                 }
209                 for _, encBlockID := range bl.Uncommitted {
210                         blockID, err := base64.StdEncoding.DecodeString(encBlockID)
211                         if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
212                                 h.logger.Printf("Invalid blockid: %+q", encBlockID)
213                                 rw.WriteHeader(http.StatusBadRequest)
214                                 return
215                         }
216                         blob.Data = blob.Uncommitted[string(blockID)]
217                         blob.Etag = makeEtag()
218                         blob.Mtime = time.Now()
219                         delete(blob.Uncommitted, string(blockID))
220                 }
221                 rw.WriteHeader(http.StatusCreated)
222         case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
223                 // "Set Metadata Headers" API. We don't bother
224                 // stubbing "Get Metadata Headers": AzureBlobVolume
225                 // sets metadata headers only as a way to bump Etag
226                 // and Last-Modified.
227                 if !blobExists {
228                         h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
229                         rw.WriteHeader(http.StatusBadRequest)
230                         return
231                 }
232                 blob.Metadata = make(map[string]string)
233                 for k, v := range r.Header {
234                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
235                                 name := k[len("x-ms-meta-"):]
236                                 blob.Metadata[strings.ToLower(name)] = v[0]
237                         }
238                 }
239                 blob.Mtime = time.Now()
240                 blob.Etag = makeEtag()
241         case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
242                 // "Get Blob Metadata" API
243                 if !blobExists {
244                         rw.WriteHeader(http.StatusNotFound)
245                         return
246                 }
247                 for k, v := range blob.Metadata {
248                         rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
249                 }
250                 return
251         case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
252                 // "Get Blob" API
253                 if !blobExists {
254                         rw.WriteHeader(http.StatusNotFound)
255                         return
256                 }
257                 data := blob.Data
258                 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
259                         b0, err0 := strconv.Atoi(rangeSpec[1])
260                         b1, err1 := strconv.Atoi(rangeSpec[2])
261                         if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
262                                 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
263                                 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
264                                 return
265                         }
266                         rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
267                         rw.WriteHeader(http.StatusPartialContent)
268                         data = data[b0 : b1+1]
269                 }
270                 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
271                 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
272                 if r.Method == "GET" {
273                         if _, err := rw.Write(data); err != nil {
274                                 h.logger.Printf("write %+q: %s", data, err)
275                         }
276                 }
277                 h.unlockAndRace()
278         case r.Method == "DELETE" && hash != "":
279                 // "Delete Blob" API
280                 if !blobExists {
281                         rw.WriteHeader(http.StatusNotFound)
282                         return
283                 }
284                 delete(h.blobs, container+"|"+hash)
285                 rw.WriteHeader(http.StatusAccepted)
286         case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
287                 // "List Blobs" API
288                 if !h.didlist503 {
289                         h.didlist503 = true
290                         rw.WriteHeader(http.StatusServiceUnavailable)
291                         return
292                 }
293                 prefix := container + "|" + r.Form.Get("prefix")
294                 marker := r.Form.Get("marker")
295
296                 maxResults := 2
297                 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
298                         maxResults = n
299                 }
300
301                 resp := storage.BlobListResponse{
302                         Marker:     marker,
303                         NextMarker: "",
304                         MaxResults: int64(maxResults),
305                 }
306                 var hashes sort.StringSlice
307                 for k := range h.blobs {
308                         if strings.HasPrefix(k, prefix) {
309                                 hashes = append(hashes, k[len(container)+1:])
310                         }
311                 }
312                 hashes.Sort()
313                 for _, hash := range hashes {
314                         if len(resp.Blobs) == maxResults {
315                                 resp.NextMarker = hash
316                                 break
317                         }
318                         if len(resp.Blobs) > 0 || marker == "" || marker == hash {
319                                 blob := h.blobs[container+"|"+hash]
320                                 bmeta := map[string]string(nil)
321                                 if r.Form.Get("include") == "metadata" {
322                                         bmeta = blob.Metadata
323                                 }
324                                 b := storage.Blob{
325                                         Name: hash,
326                                         Properties: storage.BlobProperties{
327                                                 LastModified:  storage.TimeRFC1123(blob.Mtime),
328                                                 ContentLength: int64(len(blob.Data)),
329                                                 Etag:          blob.Etag,
330                                         },
331                                         Metadata: bmeta,
332                                 }
333                                 resp.Blobs = append(resp.Blobs, b)
334                         }
335                 }
336                 buf, err := xml.Marshal(resp)
337                 if err != nil {
338                         h.logger.Error(err)
339                         rw.WriteHeader(http.StatusInternalServerError)
340                 }
341                 rw.Write(buf)
342         default:
343                 h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
344                 rw.WriteHeader(http.StatusNotImplemented)
345         }
346 }
347
348 // azStubDialer is a net.Dialer that notices when the Azure driver
349 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
350 // in such cases transparently dials "127.0.0.1:46067" instead.
351 type azStubDialer struct {
352         logger logrus.FieldLogger
353         net.Dialer
354 }
355
356 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
357
358 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
359         if hp := localHostPortRe.FindString(address); hp != "" {
360                 if azureTestDebug {
361                         d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
362                 }
363                 address = hp
364         }
365         return d.Dialer.Dial(network, address)
366 }
367
368 type TestableAzureBlobVolume struct {
369         *AzureBlobVolume
370         azHandler *azStubHandler
371         azStub    *httptest.Server
372         t         TB
373 }
374
375 func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume {
376         azHandler := newAzStubHandler(t.(*check.C))
377         azStub := httptest.NewServer(azHandler)
378
379         var azClient storage.Client
380         var err error
381
382         container := azureTestContainer
383         if container == "" {
384                 // Connect to stub instead of real Azure storage service
385                 stubURLBase := strings.Split(azStub.URL, "://")[1]
386                 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
387                         t.Fatal(err)
388                 }
389                 container = "fakecontainername"
390         } else {
391                 // Connect to real Azure storage service
392                 if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
393                         t.Fatal(err)
394                 }
395         }
396         azClient.Sender = &singleSender{}
397
398         bs := azClient.GetBlobService()
399         v := &AzureBlobVolume{
400                 ContainerName:        container,
401                 WriteRaceInterval:    arvados.Duration(time.Millisecond),
402                 WriteRacePollTime:    arvados.Duration(time.Nanosecond),
403                 ListBlobsMaxAttempts: 2,
404                 ListBlobsRetryDelay:  arvados.Duration(time.Millisecond),
405                 azClient:             azClient,
406                 container:            &azureContainer{ctr: bs.GetContainerReference(container)},
407                 cluster:              cluster,
408                 volume:               volume,
409                 logger:               ctxlog.TestLogger(t),
410                 metrics:              metrics,
411         }
412         if err = v.check(); err != nil {
413                 t.Fatal(err)
414         }
415
416         return &TestableAzureBlobVolume{
417                 AzureBlobVolume: v,
418                 azHandler:       azHandler,
419                 azStub:          azStub,
420                 t:               t,
421         }
422 }
423
424 var _ = check.Suite(&StubbedAzureBlobSuite{})
425
426 type StubbedAzureBlobSuite struct {
427         origHTTPTransport http.RoundTripper
428 }
429
430 func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
431         s.origHTTPTransport = http.DefaultTransport
432         http.DefaultTransport = &http.Transport{
433                 Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
434         }
435 }
436
437 func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
438         http.DefaultTransport = s.origHTTPTransport
439 }
440
441 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
442         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
443                 return s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
444         })
445 }
446
447 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
448         // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
449         for _, b := range []int{2 << 22, 2<<22 - 1} {
450                 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
451                         v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
452                         v.MaxGetBytes = b
453                         return v
454                 })
455         }
456 }
457
458 func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
459         DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
460                 return s.newTestableAzureBlobVolume(c, cluster, volume, metrics)
461         })
462 }
463
464 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
465         v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
466         defer v.Teardown()
467
468         for _, size := range []int{
469                 2<<22 - 1, // one <max read
470                 2 << 22,   // one =max read
471                 2<<22 + 1, // one =max read, one <max
472                 2 << 23,   // two =max reads
473                 BlockSize - 1,
474                 BlockSize,
475         } {
476                 data := make([]byte, size)
477                 for i := range data {
478                         data[i] = byte((i + 7) & 0xff)
479                 }
480                 hash := fmt.Sprintf("%x", md5.Sum(data))
481                 err := v.Put(context.Background(), hash, data)
482                 if err != nil {
483                         c.Error(err)
484                 }
485                 gotData := make([]byte, len(data))
486                 gotLen, err := v.Get(context.Background(), hash, gotData)
487                 if err != nil {
488                         c.Error(err)
489                 }
490                 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
491                 if gotLen != size {
492                         c.Errorf("length mismatch: got %d != %d", gotLen, size)
493                 }
494                 if gotHash != hash {
495                         c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
496                 }
497         }
498 }
499
500 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
501         v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
502         defer v.Teardown()
503
504         var wg sync.WaitGroup
505
506         v.azHandler.race = make(chan chan struct{})
507
508         wg.Add(1)
509         go func() {
510                 defer wg.Done()
511                 err := v.Put(context.Background(), TestHash, TestBlock)
512                 if err != nil {
513                         c.Error(err)
514                 }
515         }()
516         continuePut := make(chan struct{})
517         // Wait for the stub's Put to create the empty blob
518         v.azHandler.race <- continuePut
519         wg.Add(1)
520         go func() {
521                 defer wg.Done()
522                 buf := make([]byte, len(TestBlock))
523                 _, err := v.Get(context.Background(), TestHash, buf)
524                 if err != nil {
525                         c.Error(err)
526                 }
527         }()
528         // Wait for the stub's Get to get the empty blob
529         close(v.azHandler.race)
530         // Allow stub's Put to continue, so the real data is ready
531         // when the volume's Get retries
532         <-continuePut
533         // Wait for Get() and Put() to finish
534         wg.Wait()
535 }
536
537 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
538         v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
539         v.AzureBlobVolume.WriteRaceInterval.Set("2s")
540         v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
541         defer v.Teardown()
542
543         v.PutRaw(TestHash, nil)
544
545         buf := new(bytes.Buffer)
546         v.IndexTo("", buf)
547         if buf.Len() != 0 {
548                 c.Errorf("Index %+q should be empty", buf.Bytes())
549         }
550
551         v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
552
553         allDone := make(chan struct{})
554         go func() {
555                 defer close(allDone)
556                 buf := make([]byte, BlockSize)
557                 n, err := v.Get(context.Background(), TestHash, buf)
558                 if err != nil {
559                         c.Error(err)
560                         return
561                 }
562                 if n != 0 {
563                         c.Errorf("Got %+q, expected empty buf", buf[:n])
564                 }
565         }()
566         select {
567         case <-allDone:
568         case <-time.After(time.Second):
569                 c.Error("Get should have stopped waiting for race when block was 2s old")
570         }
571
572         buf.Reset()
573         v.IndexTo("", buf)
574         if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
575                 c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
576         }
577 }
578
579 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelGet(c *check.C) {
580         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
581                 v.PutRaw(TestHash, TestBlock)
582                 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
583                 return err
584         })
585 }
586
587 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelPut(c *check.C) {
588         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
589                 return v.Put(ctx, TestHash, make([]byte, BlockSize))
590         })
591 }
592
593 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelCompare(c *check.C) {
594         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
595                 v.PutRaw(TestHash, TestBlock)
596                 return v.Compare(ctx, TestHash, TestBlock2)
597         })
598 }
599
600 func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
601         v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
602         defer v.Teardown()
603         v.azHandler.race = make(chan chan struct{})
604
605         ctx, cancel := context.WithCancel(context.Background())
606         allDone := make(chan struct{})
607         go func() {
608                 defer close(allDone)
609                 err := testFunc(ctx, v)
610                 if err != context.Canceled {
611                         c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
612                 }
613         }()
614         releaseHandler := make(chan struct{})
615         select {
616         case <-allDone:
617                 c.Error("testFunc finished without waiting for v.azHandler.race")
618         case <-time.After(10 * time.Second):
619                 c.Error("timed out waiting to enter handler")
620         case v.azHandler.race <- releaseHandler:
621         }
622
623         cancel()
624
625         select {
626         case <-time.After(10 * time.Second):
627                 c.Error("timed out waiting to cancel")
628         case <-allDone:
629         }
630
631         go func() {
632                 <-releaseHandler
633         }()
634 }
635
636 func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
637         volume := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
638         defer volume.Teardown()
639
640         stats := func() string {
641                 buf, err := json.Marshal(volume.InternalStats())
642                 c.Check(err, check.IsNil)
643                 return string(buf)
644         }
645
646         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
647         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
648
649         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
650         _, err := volume.Get(context.Background(), loc, make([]byte, 3))
651         c.Check(err, check.NotNil)
652         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
653         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
654         c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
655         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
656
657         err = volume.Put(context.Background(), loc, []byte("foo"))
658         c.Check(err, check.IsNil)
659         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
660         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
661
662         _, err = volume.Get(context.Background(), loc, make([]byte, 3))
663         c.Check(err, check.IsNil)
664         _, err = volume.Get(context.Background(), loc, make([]byte, 3))
665         c.Check(err, check.IsNil)
666         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
667 }
668
669 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
670         v.azHandler.PutRaw(v.ContainerName, locator, data)
671 }
672
673 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
674         v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
675 }
676
677 func (v *TestableAzureBlobVolume) Teardown() {
678         v.azStub.Close()
679 }
680
681 func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
682         return "get", "create"
683 }
684
685 func makeEtag() string {
686         return fmt.Sprintf("0x%x", rand.Int63())
687 }