2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / azure_blob_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/base64"
12         "encoding/json"
13         "encoding/xml"
14         "flag"
15         "fmt"
16         "io"
17         "io/ioutil"
18         "math/rand"
19         "net"
20         "net/http"
21         "net/http/httptest"
22         "os"
23         "regexp"
24         "sort"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "git.arvados.org/arvados.git/sdk/go/arvados"
31         "git.arvados.org/arvados.git/sdk/go/ctxlog"
32         "github.com/Azure/azure-sdk-for-go/storage"
33         "github.com/prometheus/client_golang/prometheus"
34         "github.com/sirupsen/logrus"
35         check "gopkg.in/check.v1"
36 )
37
38 const (
39         // This cannot be the fake account name "devstoreaccount1"
40         // used by Microsoft's Azure emulator: the Azure SDK
41         // recognizes that magic string and changes its behavior to
42         // cater to the Azure SDK's own test suite.
43         fakeAccountName = "fakeaccountname"
44         fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
45 )
46
47 var (
48         azureTestContainer string
49         azureTestDebug     = os.Getenv("ARVADOS_DEBUG") != ""
50 )
51
52 func init() {
53         flag.StringVar(
54                 &azureTestContainer,
55                 "test.azure-storage-container-volume",
56                 "",
57                 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
58 }
59
60 type azBlob struct {
61         Data        []byte
62         Etag        string
63         Metadata    map[string]string
64         Mtime       time.Time
65         Uncommitted map[string][]byte
66 }
67
68 type azStubHandler struct {
69         sync.Mutex
70         logger     logrus.FieldLogger
71         blobs      map[string]*azBlob
72         race       chan chan struct{}
73         didlist503 bool
74 }
75
76 func newAzStubHandler(c *check.C) *azStubHandler {
77         return &azStubHandler{
78                 blobs:  make(map[string]*azBlob),
79                 logger: ctxlog.TestLogger(c),
80         }
81 }
82
83 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
84         blob, ok := h.blobs[container+"|"+hash]
85         if !ok {
86                 return
87         }
88         blob.Mtime = t
89 }
90
91 func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
92         h.Lock()
93         defer h.Unlock()
94         h.blobs[container+"|"+hash] = &azBlob{
95                 Data:        data,
96                 Mtime:       time.Now(),
97                 Metadata:    make(map[string]string),
98                 Uncommitted: make(map[string][]byte),
99         }
100 }
101
102 func (h *azStubHandler) unlockAndRace() {
103         if h.race == nil {
104                 return
105         }
106         h.Unlock()
107         // Signal caller that race is starting by reading from
108         // h.race. If we get a channel, block until that channel is
109         // ready to receive. If we get nil (or h.race is closed) just
110         // proceed.
111         if c := <-h.race; c != nil {
112                 c <- struct{}{}
113         }
114         h.Lock()
115 }
116
117 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
118
119 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
120         h.Lock()
121         defer h.Unlock()
122         if azureTestDebug {
123                 defer h.logger.Printf("azStubHandler: %+v", r)
124         }
125
126         path := strings.Split(r.URL.Path, "/")
127         container := path[1]
128         hash := ""
129         if len(path) > 2 {
130                 hash = path[2]
131         }
132
133         if err := r.ParseForm(); err != nil {
134                 h.logger.Printf("azStubHandler(%+v): %s", r, err)
135                 rw.WriteHeader(http.StatusBadRequest)
136                 return
137         }
138
139         if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
140                 rw.WriteHeader(http.StatusLengthRequired)
141                 return
142         }
143
144         body, err := ioutil.ReadAll(r.Body)
145         if err != nil {
146                 return
147         }
148
149         type blockListRequestBody struct {
150                 XMLName     xml.Name `xml:"BlockList"`
151                 Uncommitted []string
152         }
153
154         blob, blobExists := h.blobs[container+"|"+hash]
155
156         switch {
157         case r.Method == "PUT" && r.Form.Get("comp") == "":
158                 // "Put Blob" API
159                 if _, ok := h.blobs[container+"|"+hash]; !ok {
160                         // Like the real Azure service, we offer a
161                         // race window during which other clients can
162                         // list/get the new blob before any data is
163                         // committed.
164                         h.blobs[container+"|"+hash] = &azBlob{
165                                 Mtime:       time.Now(),
166                                 Uncommitted: make(map[string][]byte),
167                                 Metadata:    make(map[string]string),
168                                 Etag:        makeEtag(),
169                         }
170                         h.unlockAndRace()
171                 }
172                 metadata := make(map[string]string)
173                 for k, v := range r.Header {
174                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
175                                 name := k[len("x-ms-meta-"):]
176                                 metadata[strings.ToLower(name)] = v[0]
177                         }
178                 }
179                 h.blobs[container+"|"+hash] = &azBlob{
180                         Data:        body,
181                         Mtime:       time.Now(),
182                         Uncommitted: make(map[string][]byte),
183                         Metadata:    metadata,
184                         Etag:        makeEtag(),
185                 }
186                 rw.WriteHeader(http.StatusCreated)
187         case r.Method == "PUT" && r.Form.Get("comp") == "block":
188                 // "Put Block" API
189                 if !blobExists {
190                         h.logger.Printf("Got block for nonexistent blob: %+v", r)
191                         rw.WriteHeader(http.StatusBadRequest)
192                         return
193                 }
194                 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
195                 if err != nil || len(blockID) == 0 {
196                         h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
197                         rw.WriteHeader(http.StatusBadRequest)
198                         return
199                 }
200                 blob.Uncommitted[string(blockID)] = body
201                 rw.WriteHeader(http.StatusCreated)
202         case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
203                 // "Put Block List" API
204                 bl := &blockListRequestBody{}
205                 if err := xml.Unmarshal(body, bl); err != nil {
206                         h.logger.Printf("xml Unmarshal: %s", err)
207                         rw.WriteHeader(http.StatusBadRequest)
208                         return
209                 }
210                 for _, encBlockID := range bl.Uncommitted {
211                         blockID, err := base64.StdEncoding.DecodeString(encBlockID)
212                         if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
213                                 h.logger.Printf("Invalid blockid: %+q", encBlockID)
214                                 rw.WriteHeader(http.StatusBadRequest)
215                                 return
216                         }
217                         blob.Data = blob.Uncommitted[string(blockID)]
218                         blob.Etag = makeEtag()
219                         blob.Mtime = time.Now()
220                         delete(blob.Uncommitted, string(blockID))
221                 }
222                 rw.WriteHeader(http.StatusCreated)
223         case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
224                 // "Set Metadata Headers" API. We don't bother
225                 // stubbing "Get Metadata Headers": AzureBlobVolume
226                 // sets metadata headers only as a way to bump Etag
227                 // and Last-Modified.
228                 if !blobExists {
229                         h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
230                         rw.WriteHeader(http.StatusBadRequest)
231                         return
232                 }
233                 blob.Metadata = make(map[string]string)
234                 for k, v := range r.Header {
235                         if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
236                                 name := k[len("x-ms-meta-"):]
237                                 blob.Metadata[strings.ToLower(name)] = v[0]
238                         }
239                 }
240                 blob.Mtime = time.Now()
241                 blob.Etag = makeEtag()
242         case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
243                 // "Get Blob Metadata" API
244                 if !blobExists {
245                         rw.WriteHeader(http.StatusNotFound)
246                         return
247                 }
248                 for k, v := range blob.Metadata {
249                         rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
250                 }
251                 return
252         case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
253                 // "Get Blob" API
254                 if !blobExists {
255                         rw.WriteHeader(http.StatusNotFound)
256                         return
257                 }
258                 data := blob.Data
259                 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
260                         b0, err0 := strconv.Atoi(rangeSpec[1])
261                         b1, err1 := strconv.Atoi(rangeSpec[2])
262                         if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
263                                 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
264                                 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
265                                 return
266                         }
267                         rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
268                         rw.WriteHeader(http.StatusPartialContent)
269                         data = data[b0 : b1+1]
270                 }
271                 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
272                 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
273                 if r.Method == "GET" {
274                         if _, err := rw.Write(data); err != nil {
275                                 h.logger.Printf("write %+q: %s", data, err)
276                         }
277                 }
278                 h.unlockAndRace()
279         case r.Method == "DELETE" && hash != "":
280                 // "Delete Blob" API
281                 if !blobExists {
282                         rw.WriteHeader(http.StatusNotFound)
283                         return
284                 }
285                 delete(h.blobs, container+"|"+hash)
286                 rw.WriteHeader(http.StatusAccepted)
287         case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
288                 // "List Blobs" API
289                 if !h.didlist503 {
290                         h.didlist503 = true
291                         rw.WriteHeader(http.StatusServiceUnavailable)
292                         return
293                 }
294                 prefix := container + "|" + r.Form.Get("prefix")
295                 marker := r.Form.Get("marker")
296
297                 maxResults := 2
298                 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
299                         maxResults = n
300                 }
301
302                 resp := storage.BlobListResponse{
303                         Marker:     marker,
304                         NextMarker: "",
305                         MaxResults: int64(maxResults),
306                 }
307                 var hashes sort.StringSlice
308                 for k := range h.blobs {
309                         if strings.HasPrefix(k, prefix) {
310                                 hashes = append(hashes, k[len(container)+1:])
311                         }
312                 }
313                 hashes.Sort()
314                 for _, hash := range hashes {
315                         if len(resp.Blobs) == maxResults {
316                                 resp.NextMarker = hash
317                                 break
318                         }
319                         if len(resp.Blobs) > 0 || marker == "" || marker == hash {
320                                 blob := h.blobs[container+"|"+hash]
321                                 bmeta := map[string]string(nil)
322                                 if r.Form.Get("include") == "metadata" {
323                                         bmeta = blob.Metadata
324                                 }
325                                 b := storage.Blob{
326                                         Name: hash,
327                                         Properties: storage.BlobProperties{
328                                                 LastModified:  storage.TimeRFC1123(blob.Mtime),
329                                                 ContentLength: int64(len(blob.Data)),
330                                                 Etag:          blob.Etag,
331                                         },
332                                         Metadata: bmeta,
333                                 }
334                                 resp.Blobs = append(resp.Blobs, b)
335                         }
336                 }
337                 buf, err := xml.Marshal(resp)
338                 if err != nil {
339                         h.logger.Error(err)
340                         rw.WriteHeader(http.StatusInternalServerError)
341                 }
342                 rw.Write(buf)
343         default:
344                 h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
345                 rw.WriteHeader(http.StatusNotImplemented)
346         }
347 }
348
349 // azStubDialer is a net.Dialer that notices when the Azure driver
350 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
351 // in such cases transparently dials "127.0.0.1:46067" instead.
352 type azStubDialer struct {
353         logger logrus.FieldLogger
354         net.Dialer
355 }
356
357 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
358
359 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
360         if hp := localHostPortRe.FindString(address); hp != "" {
361                 if azureTestDebug {
362                         d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
363                 }
364                 address = hp
365         }
366         return d.Dialer.Dial(network, address)
367 }
368
369 type testableAzureBlobVolume struct {
370         *AzureBlobVolume
371         azHandler *azStubHandler
372         azStub    *httptest.Server
373         t         TB
374 }
375
376 func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
377         azHandler := newAzStubHandler(t.(*check.C))
378         azStub := httptest.NewServer(azHandler)
379
380         var azClient storage.Client
381         var err error
382
383         container := azureTestContainer
384         if container == "" {
385                 // Connect to stub instead of real Azure storage service
386                 stubURLBase := strings.Split(azStub.URL, "://")[1]
387                 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
388                         t.Fatal(err)
389                 }
390                 container = "fakecontainername"
391         } else {
392                 // Connect to real Azure storage service
393                 if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
394                         t.Fatal(err)
395                 }
396         }
397         azClient.Sender = &singleSender{}
398
399         bs := azClient.GetBlobService()
400         v := &AzureBlobVolume{
401                 ContainerName:        container,
402                 WriteRaceInterval:    arvados.Duration(time.Millisecond),
403                 WriteRacePollTime:    arvados.Duration(time.Nanosecond),
404                 ListBlobsMaxAttempts: 2,
405                 ListBlobsRetryDelay:  arvados.Duration(time.Millisecond),
406                 azClient:             azClient,
407                 container:            &azureContainer{ctr: bs.GetContainerReference(container)},
408                 cluster:              params.Cluster,
409                 volume:               params.ConfigVolume,
410                 logger:               ctxlog.TestLogger(t),
411                 metrics:              params.MetricsVecs,
412                 bufferPool:           params.BufferPool,
413         }
414         if err = v.check(); err != nil {
415                 t.Fatal(err)
416         }
417
418         return &testableAzureBlobVolume{
419                 AzureBlobVolume: v,
420                 azHandler:       azHandler,
421                 azStub:          azStub,
422                 t:               t,
423         }
424 }
425
426 var _ = check.Suite(&stubbedAzureBlobSuite{})
427
428 type stubbedAzureBlobSuite struct {
429         origHTTPTransport http.RoundTripper
430 }
431
432 func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
433         s.origHTTPTransport = http.DefaultTransport
434         http.DefaultTransport = &http.Transport{
435                 Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
436         }
437 }
438
439 func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
440         http.DefaultTransport = s.origHTTPTransport
441 }
442
443 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
444         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
445                 return s.newTestableAzureBlobVolume(t, params)
446         })
447 }
448
449 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
450         // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
451         for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
452                 c.Logf("=== MaxGetBytes=%d", b)
453                 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
454                         v := s.newTestableAzureBlobVolume(t, params)
455                         v.MaxGetBytes = b
456                         return v
457                 })
458         }
459 }
460
461 func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
462         DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
463                 return s.newTestableAzureBlobVolume(c, params)
464         })
465 }
466
467 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
468         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
469                 Cluster:      testCluster(c),
470                 ConfigVolume: arvados.Volume{Replication: 3},
471                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
472                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
473         })
474         defer v.Teardown()
475
476         for _, size := range []int{
477                 2<<22 - 1, // one <max read
478                 2 << 22,   // one =max read
479                 2<<22 + 1, // one =max read, one <max
480                 2 << 23,   // two =max reads
481                 BlockSize - 1,
482                 BlockSize,
483         } {
484                 data := make([]byte, size)
485                 for i := range data {
486                         data[i] = byte((i + 7) & 0xff)
487                 }
488                 hash := fmt.Sprintf("%x", md5.Sum(data))
489                 err := v.BlockWrite(context.Background(), hash, data)
490                 if err != nil {
491                         c.Error(err)
492                 }
493                 gotData := bytes.NewBuffer(nil)
494                 gotLen, err := v.BlockRead(context.Background(), hash, gotData)
495                 if err != nil {
496                         c.Error(err)
497                 }
498                 gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
499                 if gotLen != size {
500                         c.Errorf("length mismatch: got %d != %d", gotLen, size)
501                 }
502                 if gotHash != hash {
503                         c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
504                 }
505         }
506 }
507
508 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
509         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
510                 Cluster:      testCluster(c),
511                 ConfigVolume: arvados.Volume{Replication: 3},
512                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
513                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
514         })
515         defer v.Teardown()
516
517         var wg sync.WaitGroup
518
519         v.azHandler.race = make(chan chan struct{})
520
521         wg.Add(1)
522         go func() {
523                 defer wg.Done()
524                 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
525                 if err != nil {
526                         c.Error(err)
527                 }
528         }()
529         continueBlockWrite := make(chan struct{})
530         // Wait for the stub's BlockWrite to create the empty blob
531         v.azHandler.race <- continueBlockWrite
532         wg.Add(1)
533         go func() {
534                 defer wg.Done()
535                 _, err := v.BlockRead(context.Background(), TestHash, io.Discard)
536                 if err != nil {
537                         c.Error(err)
538                 }
539         }()
540         // Wait for the stub's BlockRead to get the empty blob
541         close(v.azHandler.race)
542         // Allow stub's BlockWrite to continue, so the real data is ready
543         // when the volume's BlockRead retries
544         <-continueBlockWrite
545         // Wait for BlockRead() and BlockWrite() to finish
546         wg.Wait()
547 }
548
549 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
550         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
551                 Cluster:      testCluster(c),
552                 ConfigVolume: arvados.Volume{Replication: 3},
553                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
554                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
555         })
556         v.AzureBlobVolume.WriteRaceInterval.Set("2s")
557         v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
558         defer v.Teardown()
559
560         v.BlockWriteRaw(TestHash, nil)
561
562         buf := new(bytes.Buffer)
563         v.Index(context.Background(), "", buf)
564         if buf.Len() != 0 {
565                 c.Errorf("Index %+q should be empty", buf.Bytes())
566         }
567
568         v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
569
570         allDone := make(chan struct{})
571         go func() {
572                 defer close(allDone)
573                 buf := bytes.NewBuffer(nil)
574                 n, err := v.BlockRead(context.Background(), TestHash, buf)
575                 if err != nil {
576                         c.Error(err)
577                         return
578                 }
579                 if n != 0 {
580                         c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
581                 }
582         }()
583         select {
584         case <-allDone:
585         case <-time.After(time.Second):
586                 c.Error("BlockRead should have stopped waiting for race when block was 2s old")
587         }
588
589         buf.Reset()
590         v.Index(context.Background(), "", buf)
591         if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
592                 c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
593         }
594 }
595
596 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
597         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
598                 v.BlockWriteRaw(TestHash, TestBlock)
599                 _, err := v.BlockRead(ctx, TestHash, io.Discard)
600                 return err
601         })
602 }
603
604 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
605         s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
606                 return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
607         })
608 }
609
610 func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
611         v := s.newTestableAzureBlobVolume(c, newVolumeParams{
612                 Cluster:      testCluster(c),
613                 ConfigVolume: arvados.Volume{Replication: 3},
614                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
615                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
616         })
617         defer v.Teardown()
618         v.azHandler.race = make(chan chan struct{})
619
620         ctx, cancel := context.WithCancel(context.Background())
621         allDone := make(chan struct{})
622         go func() {
623                 defer close(allDone)
624                 err := testFunc(ctx, v)
625                 if err != context.Canceled {
626                         c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
627                 }
628         }()
629         releaseHandler := make(chan struct{})
630         select {
631         case <-allDone:
632                 c.Error("testFunc finished without waiting for v.azHandler.race")
633         case <-time.After(10 * time.Second):
634                 c.Error("timed out waiting to enter handler")
635         case v.azHandler.race <- releaseHandler:
636         }
637
638         cancel()
639
640         select {
641         case <-time.After(10 * time.Second):
642                 c.Error("timed out waiting to cancel")
643         case <-allDone:
644         }
645
646         go func() {
647                 <-releaseHandler
648         }()
649 }
650
651 func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
652         volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
653                 Cluster:      testCluster(c),
654                 ConfigVolume: arvados.Volume{Replication: 3},
655                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
656                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
657         })
658         defer volume.Teardown()
659
660         stats := func() string {
661                 buf, err := json.Marshal(volume.InternalStats())
662                 c.Check(err, check.IsNil)
663                 return string(buf)
664         }
665
666         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
667         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
668
669         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
670         _, err := volume.BlockRead(context.Background(), loc, io.Discard)
671         c.Check(err, check.NotNil)
672         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
673         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
674         c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
675         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
676
677         err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
678         c.Check(err, check.IsNil)
679         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
680         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
681
682         _, err = volume.BlockRead(context.Background(), loc, io.Discard)
683         c.Check(err, check.IsNil)
684         _, err = volume.BlockRead(context.Background(), loc, io.Discard)
685         c.Check(err, check.IsNil)
686         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
687 }
688
689 func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
690         v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
691 }
692
693 func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
694         v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
695 }
696
697 func (v *testableAzureBlobVolume) Teardown() {
698         v.azStub.Close()
699 }
700
701 func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
702         return "get", "create"
703 }
704
705 func makeEtag() string {
706         return fmt.Sprintf("0x%x", rand.Int63())
707 }