21842: added keep typing
[arvados.git] / services / keepstore / azure_blob_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/base64"
12         "encoding/json"
13         "encoding/xml"
14         "flag"
15         "fmt"
16         "io/ioutil"
17         "math/rand"
18         "net"
19         "net/http"
20         "net/http/httptest"
21         "os"
22         "regexp"
23         "sort"
24         "strconv"
25         "strings"
26         "sync"
27         "time"
28
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/ctxlog"
31         "github.com/Azure/azure-sdk-for-go/storage"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34         check "gopkg.in/check.v1"
35 )
36
37 const (
38         // This cannot be the fake account name "devstoreaccount1"
39         // used by Microsoft's Azure emulator: the Azure SDK
40         // recognizes that magic string and changes its behavior to
41         // cater to the Azure SDK's own test suite.
42         fakeAccountName = "fakeaccountname"
43         fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
44 )
45
46 var (
47         azureTestContainer string
48         azureTestDebug     = os.Getenv("ARVADOS_DEBUG") != ""
49 )
50
51 func init() {
52         flag.StringVar(
53                 &azureTestContainer,
54                 "test.azure-storage-container-volume",
55                 "",
56                 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
57 }
58
59 type azBlob struct {
60         Data        []byte
61         Etag        string
62         Metadata    map[string]string
63         Mtime       time.Time
64         Uncommitted map[string][]byte
65 }
66
67 type azStubHandler struct {
68         sync.Mutex
69         logger     logrus.FieldLogger
70         blobs      map[string]*azBlob
71         race       chan chan struct{}
72         didlist503 bool
73 }
74
75 func newAzStubHandler(c *check.C) *azStubHandler {
76         return &azStubHandler{
77                 blobs:  make(map[string]*azBlob),
78                 logger: ctxlog.TestLogger(c),
79         }
80 }
81
82 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
83         blob, ok := h.blobs[container+"|"+hash]
84         if !ok {
85                 return
86         }
87         blob.Mtime = t
88 }
89
90 func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
91         h.Lock()
92         defer h.Unlock()
93         h.blobs[container+"|"+hash] = &azBlob{
94                 Data:        data,
95                 Mtime:       time.Now(),
96                 Metadata:    make(map[string]string),
97                 Uncommitted: make(map[string][]byte),
98         }
99 }
100
101 func (h *azStubHandler) unlockAndRace() {
102         if h.race == nil {
103                 return
104         }
105         h.Unlock()
106         // Signal caller that race is starting by reading from
107         // h.race. If we get a channel, block until that channel is
108         // ready to receive. If we get nil (or h.race is closed) just
109         // proceed.
110         if c := <-h.race; c != nil {
111                 c <- struct{}{}
112         }
113         h.Lock()
114 }
115
116 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
117
118 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
119         h.Lock()
120         defer h.Unlock()
121         if azureTestDebug {
122                 defer h.logger.Printf("azStubHandler: %+v", r)
123         }
124
125         path := strings.Split(r.URL.Path, "/")
126         container := path[1]
127         hash := ""
128         if len(path) > 2 {
129                 hash = path[2]
130         }
131
132         if err := r.ParseForm(); err != nil {
133                 h.logger.Printf("azStubHandler(%+v): %s", r, err)
134                 rw.WriteHeader(http.StatusBadRequest)
135                 return
136         }
137
138         if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
139                 rw.WriteHeader(http.StatusLengthRequired)
140                 return
141         }
142
143         body, err := ioutil.ReadAll(r.Body)
144         if err != nil {
145                 return
146         }
147
148         type blockListRequestBody struct {
149                 XMLName     xml.Name `xml:"BlockList"`
150                 Uncommitted []string
151         }
152
153         blob, blobExists := h.blobs[container+"|"+hash]
154
155         switch {
156         case r.Method == "PUT" && r.Form.Get("comp") == "":
157                 // "Put Blob" API
158                 if _, ok := h.blobs[container+"|"+hash]; !ok {
159                         // Like the real Azure service, we offer a
160                         // race window during which other clients can
161                         // list/get the new blob before any data is
162                         // committed.
163                         h.blobs[container+"|"+hash] = &azBlob{
164                                 Mtime:       time.Now(),
165                                 Uncommitted: make(map[string][]byte),
166                                 Metadata:    make(map[string]string),
167                                 Etag:        makeEtag(),
168                         }
169                         h.unlockAndRace()
170                 }
171                 metadata := make(map[string]string)
172                 for k, v := range r.Header {
173                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
174                                 name := k[len("x-ms-meta-"):]
175                                 metadata[strings.ToLower(name)] = v[0]
176                         }
177                 }
178                 h.blobs[container+"|"+hash] = &azBlob{
179                         Data:        body,
180                         Mtime:       time.Now(),
181                         Uncommitted: make(map[string][]byte),
182                         Metadata:    metadata,
183                         Etag:        makeEtag(),
184                 }
185                 rw.WriteHeader(http.StatusCreated)
186         case r.Method == "PUT" && r.Form.Get("comp") == "block":
187                 // "Put Block" API
188                 if !blobExists {
189                         h.logger.Printf("Got block for nonexistent blob: %+v", r)
190                         rw.WriteHeader(http.StatusBadRequest)
191                         return
192                 }
193                 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
194                 if err != nil || len(blockID) == 0 {
195                         h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
196                         rw.WriteHeader(http.StatusBadRequest)
197                         return
198                 }
199                 blob.Uncommitted[string(blockID)] = body
200                 rw.WriteHeader(http.StatusCreated)
201         case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
202                 // "Put Block List" API
203                 bl := &blockListRequestBody{}
204                 if err := xml.Unmarshal(body, bl); err != nil {
205                         h.logger.Printf("xml Unmarshal: %s", err)
206                         rw.WriteHeader(http.StatusBadRequest)
207                         return
208                 }
209                 for _, encBlockID := range bl.Uncommitted {
210                         blockID, err := base64.StdEncoding.DecodeString(encBlockID)
211                         if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
212                                 h.logger.Printf("Invalid blockid: %+q", encBlockID)
213                                 rw.WriteHeader(http.StatusBadRequest)
214                                 return
215                         }
216                         blob.Data = blob.Uncommitted[string(blockID)]
217                         blob.Etag = makeEtag()
218                         blob.Mtime = time.Now()
219                         delete(blob.Uncommitted, string(blockID))
220                 }
221                 rw.WriteHeader(http.StatusCreated)
222         case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
223                 // "Set Metadata Headers" API. We don't bother
224                 // stubbing "Get Metadata Headers": azureBlobVolume
225                 // sets metadata headers only as a way to bump Etag
226                 // and Last-Modified.
227                 if !blobExists {
228                         h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
229                         rw.WriteHeader(http.StatusBadRequest)
230                         return
231                 }
232                 blob.Metadata = make(map[string]string)
233                 for k, v := range r.Header {
234                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
235                                 name := k[len("x-ms-meta-"):]
236                                 blob.Metadata[strings.ToLower(name)] = v[0]
237                         }
238                 }
239                 blob.Mtime = time.Now()
240                 blob.Etag = makeEtag()
241         case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
242                 // "Get Blob Metadata" API
243                 if !blobExists {
244                         rw.WriteHeader(http.StatusNotFound)
245                         return
246                 }
247                 for k, v := range blob.Metadata {
248                         rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
249                 }
250                 return
251         case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
252                 // "Get Blob" API
253                 if !blobExists {
254                         rw.WriteHeader(http.StatusNotFound)
255                         return
256                 }
257                 data := blob.Data
258                 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
259                         b0, err0 := strconv.Atoi(rangeSpec[1])
260                         b1, err1 := strconv.Atoi(rangeSpec[2])
261                         if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
262                                 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
263                                 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
264                                 return
265                         }
266                         rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
267                         rw.WriteHeader(http.StatusPartialContent)
268                         data = data[b0 : b1+1]
269                 }
270                 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
271                 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
272                 if r.Method == "GET" {
273                         if _, err := rw.Write(data); err != nil {
274                                 h.logger.Printf("write %+q: %s", data, err)
275                         }
276                 }
277                 h.unlockAndRace()
278         case r.Method == "DELETE" && hash != "":
279                 // "Delete Blob" API
280                 if !blobExists {
281                         rw.WriteHeader(http.StatusNotFound)
282                         return
283                 }
284                 delete(h.blobs, container+"|"+hash)
285                 rw.WriteHeader(http.StatusAccepted)
286         case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
287                 // "List Blobs" API
288                 if !h.didlist503 {
289                         h.didlist503 = true
290                         rw.WriteHeader(http.StatusServiceUnavailable)
291                         return
292                 }
293                 prefix := container + "|" + r.Form.Get("prefix")
294                 marker := r.Form.Get("marker")
295
296                 maxResults := 2
297                 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
298                         maxResults = n
299                 }
300
301                 resp := storage.BlobListResponse{
302                         Marker:     marker,
303                         NextMarker: "",
304                         MaxResults: int64(maxResults),
305                 }
306                 var hashes sort.StringSlice
307                 for k := range h.blobs {
308                         if strings.HasPrefix(k, prefix) {
309                                 hashes = append(hashes, k[len(container)+1:])
310                         }
311                 }
312                 hashes.Sort()
313                 for _, hash := range hashes {
314                         if len(resp.Blobs) == maxResults {
315                                 resp.NextMarker = hash
316                                 break
317                         }
318                         if len(resp.Blobs) > 0 || marker == "" || marker == hash {
319                                 blob := h.blobs[container+"|"+hash]
320                                 bmeta := map[string]string(nil)
321                                 if r.Form.Get("include") == "metadata" {
322                                         bmeta = blob.Metadata
323                                 }
324                                 b := storage.Blob{
325                                         Name: hash,
326                                         Properties: storage.BlobProperties{
327                                                 LastModified:  storage.TimeRFC1123(blob.Mtime),
328                                                 ContentLength: int64(len(blob.Data)),
329                                                 Etag:          blob.Etag,
330                                         },
331                                         Metadata: bmeta,
332                                 }
333                                 resp.Blobs = append(resp.Blobs, b)
334                         }
335                 }
336                 buf, err := xml.Marshal(resp)
337                 if err != nil {
338                         h.logger.Error(err)
339                         rw.WriteHeader(http.StatusInternalServerError)
340                 }
341                 rw.Write(buf)
342         default:
343                 h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
344                 rw.WriteHeader(http.StatusNotImplemented)
345         }
346 }
347
348 // azStubDialer is a net.Dialer that notices when the Azure driver
349 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
350 // in such cases transparently dials "127.0.0.1:46067" instead.
351 type azStubDialer struct {
352         logger logrus.FieldLogger
353         net.Dialer
354 }
355
356 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
357
358 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
359         if hp := localHostPortRe.FindString(address); hp != "" {
360                 if azureTestDebug {
361                         d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
362                 }
363                 address = hp
364         }
365         return d.Dialer.Dial(network, address)
366 }
367
368 type testableAzureBlobVolume struct {
369         *azureBlobVolume
370         azHandler *azStubHandler
371         azStub    *httptest.Server
372         t         TB
373 }
374
375 func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
376         azHandler := newAzStubHandler(t.(*check.C))
377         azStub := httptest.NewServer(azHandler)
378
379         var azClient storage.Client
380         var err error
381
382         container := azureTestContainer
383         if container == "" {
384                 // Connect to stub instead of real Azure storage service
385                 stubURLBase := strings.Split(azStub.URL, "://")[1]
386                 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
387                         t.Fatal(err)
388                 }
389                 container = "fakecontainername"
390         } else {
391                 // Connect to real Azure storage service
392                 if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
393                         t.Fatal(err)
394                 }
395         }
396         azClient.Sender = &singleSender{}
397
398         bs := azClient.GetBlobService()
399         v := &azureBlobVolume{
400                 ContainerName:        container,
401                 WriteRaceInterval:    arvados.Duration(time.Millisecond),
402                 WriteRacePollTime:    arvados.Duration(time.Nanosecond),
403                 ListBlobsMaxAttempts: 2,
404                 ListBlobsRetryDelay:  arvados.Duration(time.Millisecond),
405                 azClient:             azClient,
406                 container:            &azureContainer{ctr: bs.GetContainerReference(container)},
407                 cluster:              params.Cluster,
408                 volume:               params.ConfigVolume,
409                 logger:               ctxlog.TestLogger(t),
410                 metrics:              params.MetricsVecs,
411                 bufferPool:           params.BufferPool,
412         }
413         if err = v.check(); err != nil {
414                 t.Fatal(err)
415         }
416
417         return &testableAzureBlobVolume{
418                 azureBlobVolume: v,
419                 azHandler:       azHandler,
420                 azStub:          azStub,
421                 t:               t,
422         }
423 }
424
425 var _ = check.Suite(&stubbedAzureBlobSuite{})
426
427 type stubbedAzureBlobSuite struct {
428         origHTTPTransport http.RoundTripper
429 }
430
431 func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
432         s.origHTTPTransport = http.DefaultTransport
433         http.DefaultTransport = &http.Transport{
434                 Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
435         }
436 }
437
438 func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
439         http.DefaultTransport = s.origHTTPTransport
440 }
441
442 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
443         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
444                 return s.newTestableAzureBlobVolume(t, params)
445         })
446 }
447
448 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
449         // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
450         for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
451                 c.Logf("=== MaxGetBytes=%d", b)
452                 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
453                         v := s.newTestableAzureBlobVolume(t, params)
454                         v.MaxGetBytes = b
455                         return v
456                 })
457         }
458 }
459
460 func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
461         DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
462                 return s.newTestableAzureBlobVolume(c, params)
463         })
464 }
465
466 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
467         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
468                 Cluster:      testCluster(c),
469                 ConfigVolume: arvados.Volume{Replication: 3},
470                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
471                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
472         })
473         defer v.Teardown()
474
475         for _, size := range []int{
476                 2<<22 - 1, // one <max read
477                 2 << 22,   // one =max read
478                 2<<22 + 1, // one =max read, one <max
479                 2 << 23,   // two =max reads
480                 BlockSize - 1,
481                 BlockSize,
482         } {
483                 data := make([]byte, size)
484                 for i := range data {
485                         data[i] = byte((i + 7) & 0xff)
486                 }
487                 hash := fmt.Sprintf("%x", md5.Sum(data))
488                 err := v.BlockWrite(context.Background(), hash, data)
489                 if err != nil {
490                         c.Error(err)
491                 }
492                 gotData := &brbuffer{}
493                 err = v.BlockRead(context.Background(), hash, gotData)
494                 if err != nil {
495                         c.Error(err)
496                 }
497                 gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
498                 c.Check(gotData.Len(), check.Equals, size)
499                 if gotHash != hash {
500                         c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
501                 }
502         }
503 }
504
505 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
506         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
507                 Cluster:      testCluster(c),
508                 ConfigVolume: arvados.Volume{Replication: 3},
509                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
510                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
511         })
512         defer v.Teardown()
513
514         var wg sync.WaitGroup
515
516         v.azHandler.race = make(chan chan struct{})
517
518         wg.Add(1)
519         go func() {
520                 defer wg.Done()
521                 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
522                 if err != nil {
523                         c.Error(err)
524                 }
525         }()
526         continueBlockWrite := make(chan struct{})
527         // Wait for the stub's BlockWrite to create the empty blob
528         v.azHandler.race <- continueBlockWrite
529         wg.Add(1)
530         go func() {
531                 defer wg.Done()
532                 err := v.BlockRead(context.Background(), TestHash, brdiscard)
533                 if err != nil {
534                         c.Error(err)
535                 }
536         }()
537         // Wait for the stub's BlockRead to get the empty blob
538         close(v.azHandler.race)
539         // Allow stub's BlockWrite to continue, so the real data is ready
540         // when the volume's BlockRead retries
541         <-continueBlockWrite
542         // Wait for BlockRead() and BlockWrite() to finish
543         wg.Wait()
544 }
545
546 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
547         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
548                 Cluster:      testCluster(c),
549                 ConfigVolume: arvados.Volume{Replication: 3},
550                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
551                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
552         })
553         v.azureBlobVolume.WriteRaceInterval.Set("2s")
554         v.azureBlobVolume.WriteRacePollTime.Set("5ms")
555         defer v.Teardown()
556
557         v.BlockWriteRaw(TestHash, nil)
558
559         buf := new(bytes.Buffer)
560         v.Index(context.Background(), "", buf)
561         if buf.Len() != 0 {
562                 c.Errorf("Index %+q should be empty", buf.Bytes())
563         }
564
565         v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
566
567         allDone := make(chan struct{})
568         go func() {
569                 defer close(allDone)
570                 buf := &brbuffer{}
571                 err := v.BlockRead(context.Background(), TestHash, buf)
572                 if err != nil {
573                         c.Error(err)
574                         return
575                 }
576                 c.Check(buf.String(), check.Equals, "")
577         }()
578         select {
579         case <-allDone:
580         case <-time.After(time.Second):
581                 c.Error("BlockRead should have stopped waiting for race when block was 2s old")
582         }
583
584         buf.Reset()
585         v.Index(context.Background(), "", buf)
586         if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
587                 c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
588         }
589 }
590
591 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
592         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
593                 v.BlockWriteRaw(TestHash, TestBlock)
594                 return v.BlockRead(ctx, TestHash, brdiscard)
595         })
596 }
597
598 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
599         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
600                 return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
601         })
602 }
603
604 func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
605         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
606                 Cluster:      testCluster(c),
607                 ConfigVolume: arvados.Volume{Replication: 3},
608                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
609                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
610         })
611         defer v.Teardown()
612         v.azHandler.race = make(chan chan struct{})
613
614         ctx, cancel := context.WithCancel(context.Background())
615         allDone := make(chan struct{})
616         go func() {
617                 defer close(allDone)
618                 err := testFunc(ctx, v)
619                 if err != context.Canceled {
620                         c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
621                 }
622         }()
623         releaseHandler := make(chan struct{})
624         select {
625         case <-allDone:
626                 c.Error("testFunc finished without waiting for v.azHandler.race")
627         case <-time.After(10 * time.Second):
628                 c.Error("timed out waiting to enter handler")
629         case v.azHandler.race <- releaseHandler:
630         }
631
632         cancel()
633
634         select {
635         case <-time.After(10 * time.Second):
636                 c.Error("timed out waiting to cancel")
637         case <-allDone:
638         }
639
640         go func() {
641                 <-releaseHandler
642         }()
643 }
644
645 func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
646         volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
647                 Cluster:      testCluster(c),
648                 ConfigVolume: arvados.Volume{Replication: 3},
649                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
650                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
651         })
652         defer volume.Teardown()
653
654         stats := func() string {
655                 buf, err := json.Marshal(volume.InternalStats())
656                 c.Check(err, check.IsNil)
657                 return string(buf)
658         }
659
660         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
661         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
662
663         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
664         err := volume.BlockRead(context.Background(), loc, brdiscard)
665         c.Check(err, check.NotNil)
666         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
667         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
668         c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
669         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
670
671         err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
672         c.Check(err, check.IsNil)
673         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
674         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
675
676         err = volume.BlockRead(context.Background(), loc, brdiscard)
677         c.Check(err, check.IsNil)
678         err = volume.BlockRead(context.Background(), loc, brdiscard)
679         c.Check(err, check.IsNil)
680         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
681 }
682
683 func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
684         v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
685 }
686
687 func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
688         v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
689 }
690
691 func (v *testableAzureBlobVolume) Teardown() {
692         v.azStub.Close()
693 }
694
695 func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
696         return "get", "create"
697 }
698
699 func makeEtag() string {
700         return fmt.Sprintf("0x%x", rand.Int63())
701 }