Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / azure_blob_volume_test.go
index c5dbc8f5831402aa3e223391c3ad0ece918de0a3..48d58ee9bfc454e5b2972e6d36867a578c29e6bb 100644 (file)
@@ -1,10 +1,15 @@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 import (
        "bytes"
        "context"
        "crypto/md5"
        "encoding/base64"
+       "encoding/json"
        "encoding/xml"
        "flag"
        "fmt"
@@ -13,25 +18,35 @@ import (
        "net"
        "net/http"
        "net/http/httptest"
+       "os"
        "regexp"
        "sort"
        "strconv"
        "strings"
        "sync"
-       "testing"
        "time"
 
-       log "github.com/Sirupsen/logrus"
-       "github.com/curoverse/azure-sdk-for-go/storage"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
 )
 
 const (
-       // The same fake credentials used by Microsoft's Azure emulator
-       emulatorAccountName = "devstoreaccount1"
-       emulatorAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+       // This cannot be the fake account name "devstoreaccount1"
+       // used by Microsoft's Azure emulator: the Azure SDK
+       // recognizes that magic string and changes its behavior to
+       // cater to the Azure SDK's own test suite.
+       fakeAccountName = "fakeaccountname"
+       fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
 )
 
-var azureTestContainer string
+var (
+       azureTestContainer string
+       azureTestDebug     = os.Getenv("ARVADOS_DEBUG") != ""
+)
 
 func init() {
        flag.StringVar(
@@ -51,13 +66,16 @@ type azBlob struct {
 
 type azStubHandler struct {
        sync.Mutex
-       blobs map[string]*azBlob
-       race  chan chan struct{}
+       logger     logrus.FieldLogger
+       blobs      map[string]*azBlob
+       race       chan chan struct{}
+       didlist503 bool
 }
 
-func newAzStubHandler() *azStubHandler {
+func newAzStubHandler(c *check.C) *azStubHandler {
        return &azStubHandler{
-               blobs: make(map[string]*azBlob),
+               blobs:  make(map[string]*azBlob),
+               logger: ctxlog.TestLogger(c),
        }
 }
 
@@ -100,7 +118,9 @@ var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
        h.Lock()
        defer h.Unlock()
-       // defer log.Printf("azStubHandler: %+v", r)
+       if azureTestDebug {
+               defer h.logger.Printf("azStubHandler: %+v", r)
+       }
 
        path := strings.Split(r.URL.Path, "/")
        container := path[1]
@@ -110,11 +130,16 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
        }
 
        if err := r.ParseForm(); err != nil {
-               log.Printf("azStubHandler(%+v): %s", r, err)
+               h.logger.Printf("azStubHandler(%+v): %s", r, err)
                rw.WriteHeader(http.StatusBadRequest)
                return
        }
 
+       if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
+               rw.WriteHeader(http.StatusLengthRequired)
+               return
+       }
+
        body, err := ioutil.ReadAll(r.Body)
        if err != nil {
                return
@@ -161,13 +186,13 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
        case r.Method == "PUT" && r.Form.Get("comp") == "block":
                // "Put Block" API
                if !blobExists {
-                       log.Printf("Got block for nonexistent blob: %+v", r)
+                       h.logger.Printf("Got block for nonexistent blob: %+v", r)
                        rw.WriteHeader(http.StatusBadRequest)
                        return
                }
                blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
                if err != nil || len(blockID) == 0 {
-                       log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+                       h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
                        rw.WriteHeader(http.StatusBadRequest)
                        return
                }
@@ -177,14 +202,14 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                // "Put Block List" API
                bl := &blockListRequestBody{}
                if err := xml.Unmarshal(body, bl); err != nil {
-                       log.Printf("xml Unmarshal: %s", err)
+                       h.logger.Printf("xml Unmarshal: %s", err)
                        rw.WriteHeader(http.StatusBadRequest)
                        return
                }
                for _, encBlockID := range bl.Uncommitted {
                        blockID, err := base64.StdEncoding.DecodeString(encBlockID)
                        if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
-                               log.Printf("Invalid blockid: %+q", encBlockID)
+                               h.logger.Printf("Invalid blockid: %+q", encBlockID)
                                rw.WriteHeader(http.StatusBadRequest)
                                return
                        }
@@ -200,7 +225,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                // sets metadata headers only as a way to bump Etag
                // and Last-Modified.
                if !blobExists {
-                       log.Printf("Got metadata for nonexistent blob: %+v", r)
+                       h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
                        rw.WriteHeader(http.StatusBadRequest)
                        return
                }
@@ -246,7 +271,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
                if r.Method == "GET" {
                        if _, err := rw.Write(data); err != nil {
-                               log.Printf("write %+q: %s", data, err)
+                               h.logger.Printf("write %+q: %s", data, err)
                        }
                }
                h.unlockAndRace()
@@ -260,6 +285,11 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                rw.WriteHeader(http.StatusAccepted)
        case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
                // "List Blobs" API
+               if !h.didlist503 {
+                       h.didlist503 = true
+                       rw.WriteHeader(http.StatusServiceUnavailable)
+                       return
+               }
                prefix := container + "|" + r.Form.Get("prefix")
                marker := r.Form.Get("marker")
 
@@ -294,7 +324,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                                b := storage.Blob{
                                        Name: hash,
                                        Properties: storage.BlobProperties{
-                                               LastModified:  blob.Mtime.Format(time.RFC1123),
+                                               LastModified:  storage.TimeRFC1123(blob.Mtime),
                                                ContentLength: int64(len(blob.Data)),
                                                Etag:          blob.Etag,
                                        },
@@ -305,12 +335,12 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                }
                buf, err := xml.Marshal(resp)
                if err != nil {
-                       log.Print(err)
+                       h.logger.Error(err)
                        rw.WriteHeader(http.StatusInternalServerError)
                }
                rw.Write(buf)
        default:
-               log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+               h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
                rw.WriteHeader(http.StatusNotImplemented)
        }
 }
@@ -319,6 +349,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
 // in such cases transparently dials "127.0.0.1:46067" instead.
 type azStubDialer struct {
+       logger logrus.FieldLogger
        net.Dialer
 }
 
@@ -326,7 +357,9 @@ var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
 
 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
        if hp := localHostPortRe.FindString(address); hp != "" {
-               log.Println("azStubDialer: dial", hp, "instead of", address)
+               if azureTestDebug {
+                       d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
+               }
                address = hp
        }
        return d.Dialer.Dial(network, address)
@@ -339,39 +372,45 @@ type TestableAzureBlobVolume struct {
        t         TB
 }
 
-func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
-       azHandler := newAzStubHandler()
+func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume {
+       azHandler := newAzStubHandler(t.(*check.C))
        azStub := httptest.NewServer(azHandler)
 
        var azClient storage.Client
+       var err error
 
        container := azureTestContainer
        if container == "" {
                // Connect to stub instead of real Azure storage service
                stubURLBase := strings.Split(azStub.URL, "://")[1]
-               var err error
-               if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
+               if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
                        t.Fatal(err)
                }
                container = "fakecontainername"
        } else {
                // Connect to real Azure storage service
-               accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
-               if err != nil {
-                       t.Fatal(err)
-               }
-               azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
-               if err != nil {
+               if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
                        t.Fatal(err)
                }
        }
+       azClient.Sender = &singleSender{}
 
+       bs := azClient.GetBlobService()
        v := &AzureBlobVolume{
-               ContainerName:    container,
-               ReadOnly:         readonly,
-               AzureReplication: replication,
-               azClient:         azClient,
-               bsClient:         azClient.GetBlobService(),
+               ContainerName:        container,
+               WriteRaceInterval:    arvados.Duration(time.Millisecond),
+               WriteRacePollTime:    arvados.Duration(time.Nanosecond),
+               ListBlobsMaxAttempts: 2,
+               ListBlobsRetryDelay:  arvados.Duration(time.Millisecond),
+               azClient:             azClient,
+               container:            &azureContainer{ctr: bs.GetContainerReference(container)},
+               cluster:              cluster,
+               volume:               volume,
+               logger:               ctxlog.TestLogger(t),
+               metrics:              metrics,
+       }
+       if err = v.check(); err != nil {
+               t.Fatal(err)
        }
 
        return &TestableAzureBlobVolume{
@@ -382,64 +421,48 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
        }
 }
 
-func TestAzureBlobVolumeWithGeneric(t *testing.T) {
-       defer func(t http.RoundTripper) {
-               http.DefaultTransport = t
-       }(http.DefaultTransport)
+var _ = check.Suite(&StubbedAzureBlobSuite{})
+
+type StubbedAzureBlobSuite struct {
+       origHTTPTransport http.RoundTripper
+}
+
+func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+       s.origHTTPTransport = http.DefaultTransport
        http.DefaultTransport = &http.Transport{
-               Dial: (&azStubDialer{}).Dial,
+               Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
        }
-       azureWriteRaceInterval = time.Millisecond
-       azureWriteRacePollTime = time.Nanosecond
-       DoGenericVolumeTests(t, func(t TB) TestableVolume {
-               return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
-       })
 }
 
-func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
-       defer func(b int) {
-               azureMaxGetBytes = b
-       }(azureMaxGetBytes)
+func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
+       http.DefaultTransport = s.origHTTPTransport
+}
 
-       defer func(t http.RoundTripper) {
-               http.DefaultTransport = t
-       }(http.DefaultTransport)
-       http.DefaultTransport = &http.Transport{
-               Dial: (&azStubDialer{}).Dial,
-       }
-       azureWriteRaceInterval = time.Millisecond
-       azureWriteRacePollTime = time.Nanosecond
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               return s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+       })
+}
+
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
        // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
-       for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
-               DoGenericVolumeTests(t, func(t TB) TestableVolume {
-                       return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+       for _, b := range []int{2 << 22, 2<<22 - 1} {
+               DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+                       v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+                       v.MaxGetBytes = b
+                       return v
                })
        }
 }
 
-func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
-       defer func(t http.RoundTripper) {
-               http.DefaultTransport = t
-       }(http.DefaultTransport)
-       http.DefaultTransport = &http.Transport{
-               Dial: (&azStubDialer{}).Dial,
-       }
-       azureWriteRaceInterval = time.Millisecond
-       azureWriteRacePollTime = time.Nanosecond
-       DoGenericVolumeTests(t, func(t TB) TestableVolume {
-               return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
+func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               return s.newTestableAzureBlobVolume(c, cluster, volume, metrics)
        })
 }
 
-func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
-       defer func(t http.RoundTripper) {
-               http.DefaultTransport = t
-       }(http.DefaultTransport)
-       http.DefaultTransport = &http.Transport{
-               Dial: (&azStubDialer{}).Dial,
-       }
-
-       v := NewTestableAzureBlobVolume(t, false, 3)
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
+       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
        defer v.Teardown()
 
        for _, size := range []int{
@@ -457,95 +480,72 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
                hash := fmt.Sprintf("%x", md5.Sum(data))
                err := v.Put(context.Background(), hash, data)
                if err != nil {
-                       t.Error(err)
+                       c.Error(err)
                }
                gotData := make([]byte, len(data))
                gotLen, err := v.Get(context.Background(), hash, gotData)
                if err != nil {
-                       t.Error(err)
+                       c.Error(err)
                }
                gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
                if gotLen != size {
-                       t.Error("length mismatch: got %d != %d", gotLen, size)
+                       c.Errorf("length mismatch: got %d != %d", gotLen, size)
                }
                if gotHash != hash {
-                       t.Error("hash mismatch: got %s != %s", gotHash, hash)
-               }
-       }
-}
-
-func TestAzureBlobVolumeReplication(t *testing.T) {
-       for r := 1; r <= 4; r++ {
-               v := NewTestableAzureBlobVolume(t, false, r)
-               defer v.Teardown()
-               if n := v.Replication(); n != r {
-                       t.Errorf("Got replication %d, expected %d", n, r)
+                       c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
                }
        }
 }
 
-func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
-       defer func(t http.RoundTripper) {
-               http.DefaultTransport = t
-       }(http.DefaultTransport)
-       http.DefaultTransport = &http.Transport{
-               Dial: (&azStubDialer{}).Dial,
-       }
-
-       v := NewTestableAzureBlobVolume(t, false, 3)
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
+       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
        defer v.Teardown()
 
-       azureWriteRaceInterval = time.Second
-       azureWriteRacePollTime = time.Millisecond
+       var wg sync.WaitGroup
 
-       allDone := make(chan struct{})
        v.azHandler.race = make(chan chan struct{})
+
+       wg.Add(1)
        go func() {
+               defer wg.Done()
                err := v.Put(context.Background(), TestHash, TestBlock)
                if err != nil {
-                       t.Error(err)
+                       c.Error(err)
                }
        }()
        continuePut := make(chan struct{})
        // Wait for the stub's Put to create the empty blob
        v.azHandler.race <- continuePut
+       wg.Add(1)
        go func() {
+               defer wg.Done()
                buf := make([]byte, len(TestBlock))
                _, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
-                       t.Error(err)
+                       c.Error(err)
                }
-               close(allDone)
        }()
        // Wait for the stub's Get to get the empty blob
        close(v.azHandler.race)
        // Allow stub's Put to continue, so the real data is ready
        // when the volume's Get retries
        <-continuePut
-       // Wait for volume's Get to return the real data
-       <-allDone
+       // Wait for Get() and Put() to finish
+       wg.Wait()
 }
 
-func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
-       defer func(t http.RoundTripper) {
-               http.DefaultTransport = t
-       }(http.DefaultTransport)
-       http.DefaultTransport = &http.Transport{
-               Dial: (&azStubDialer{}).Dial,
-       }
-
-       v := NewTestableAzureBlobVolume(t, false, 3)
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
+       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+       v.AzureBlobVolume.WriteRaceInterval.Set("2s")
+       v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
        defer v.Teardown()
 
-       azureWriteRaceInterval = 2 * time.Second
-       azureWriteRacePollTime = 5 * time.Millisecond
-
        v.PutRaw(TestHash, nil)
 
        buf := new(bytes.Buffer)
        v.IndexTo("", buf)
        if buf.Len() != 0 {
-               t.Errorf("Index %+q should be empty", buf.Bytes())
+               c.Errorf("Index %+q should be empty", buf.Bytes())
        }
 
        v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
@@ -556,26 +556,116 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
                buf := make([]byte, BlockSize)
                n, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
-                       t.Error(err)
+                       c.Error(err)
                        return
                }
                if n != 0 {
-                       t.Errorf("Got %+q, expected empty buf", buf[:n])
+                       c.Errorf("Got %+q, expected empty buf", buf[:n])
                }
        }()
        select {
        case <-allDone:
        case <-time.After(time.Second):
-               t.Error("Get should have stopped waiting for race when block was 2s old")
+               c.Error("Get should have stopped waiting for race when block was 2s old")
        }
 
        buf.Reset()
        v.IndexTo("", buf)
        if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
-               t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
+               c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
        }
 }
 
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelGet(c *check.C) {
+       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               v.PutRaw(TestHash, TestBlock)
+               _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
+               return err
+       })
+}
+
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelPut(c *check.C) {
+       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               return v.Put(ctx, TestHash, make([]byte, BlockSize))
+       })
+}
+
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelCompare(c *check.C) {
+       s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               v.PutRaw(TestHash, TestBlock)
+               return v.Compare(ctx, TestHash, TestBlock2)
+       })
+}
+
+func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
+       v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+       defer v.Teardown()
+       v.azHandler.race = make(chan chan struct{})
+
+       ctx, cancel := context.WithCancel(context.Background())
+       allDone := make(chan struct{})
+       go func() {
+               defer close(allDone)
+               err := testFunc(ctx, v)
+               if err != context.Canceled {
+                       c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
+               }
+       }()
+       releaseHandler := make(chan struct{})
+       select {
+       case <-allDone:
+               c.Error("testFunc finished without waiting for v.azHandler.race")
+       case <-time.After(10 * time.Second):
+               c.Error("timed out waiting to enter handler")
+       case v.azHandler.race <- releaseHandler:
+       }
+
+       cancel()
+
+       select {
+       case <-time.After(10 * time.Second):
+               c.Error("timed out waiting to cancel")
+       case <-allDone:
+       }
+
+       go func() {
+               <-releaseHandler
+       }()
+}
+
+func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
+       volume := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+       defer volume.Teardown()
+
+       stats := func() string {
+               buf, err := json.Marshal(volume.InternalStats())
+               c.Check(err, check.IsNil)
+               return string(buf)
+       }
+
+       c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+       c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
+
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       _, err := volume.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.NotNil)
+       c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+       err = volume.Put(context.Background(), loc, []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+       c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
+
+       _, err = volume.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       _, err = volume.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
@@ -588,6 +678,10 @@ func (v *TestableAzureBlobVolume) Teardown() {
        v.azStub.Close()
 }
 
+func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "create"
+}
+
 func makeEtag() string {
        return fmt.Sprintf("0x%x", rand.Int63())
 }