10682: Add backend stats for Azure volumes.
authorTom Clegg <tom@curoverse.com>
Mon, 2 Jan 2017 18:57:31 +0000 (13:57 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 3 Jan 2017 08:17:03 +0000 (03:17 -0500)
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/s3_volume.go
services/keepstore/stats.go [new file with mode: 0644]

index 75344890ab082ba0ef837741b747ba0889e44f67..383759803ccdbe55169f63737651aadfcb11f7c7 100644 (file)
@@ -103,7 +103,27 @@ type AzureBlobVolume struct {
        RequestTimeout        arvados.Duration
 
        azClient storage.Client
-       bsClient storage.BlobStorageClient
+       bsClient *azureBlobClient
+}
+
+// azureBlobClient wraps storage.BlobStorageClient in order to count
+// I/O and API usage stats.
+type azureBlobClient struct {
+       client *storage.BlobStorageClient
+       stats  azureBlobStats
+}
+
+type azureBlobStats struct {
+       statsTicker
+       Ops            uint64
+       GetOps         uint64
+       GetRangeOps    uint64
+       CreateOps      uint64
+       SetMetadataOps uint64
+       DelOps         uint64
+       ListOps        uint64
+
+       lock sync.Mutex
 }
 
 // Examples implements VolumeWithExamples.
@@ -147,7 +167,10 @@ func (v *AzureBlobVolume) Start() error {
        v.azClient.HTTPClient = &http.Client{
                Timeout: time.Duration(v.RequestTimeout),
        }
-       v.bsClient = v.azClient.GetBlobService()
+       bs := v.azClient.GetBlobService()
+       v.bsClient = &azureBlobClient{
+               client: &bs,
+       }
 
        ok, err := v.bsClient.ContainerExists(v.ContainerName)
        if err != nil {
@@ -623,3 +646,72 @@ func (v *AzureBlobVolume) EmptyTrash() {
 
        log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *AzureBlobVolume) InternalStats() interface{} {
+       return &v.bsClient.stats
+}
+
+func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+       c.stats.Tick(&c.stats.Ops)
+       ok, err := c.client.ContainerExists(cname)
+       c.stats.TickErr(err)
+       return ok, err
+}
+
+func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+       c.stats.Tick(&c.stats.Ops)
+       m, err := c.client.GetBlobMetadata(cname, bname)
+       c.stats.TickErr(err)
+       return m, err
+}
+
+func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+       c.stats.Tick(&c.stats.Ops)
+       p, err := c.client.GetBlobProperties(cname, bname)
+       c.stats.TickErr(err)
+       return p, err
+}
+
+func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+       c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
+       rdr, err := c.client.GetBlob(cname, bname)
+       c.stats.TickErr(err)
+       return NewCountingReader(rdr, c.stats.TickInBytes), err
+}
+
+func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+       c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
+       rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+       c.stats.TickErr(err)
+       return NewCountingReader(rdr, c.stats.TickInBytes), err
+}
+
+func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+       c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
+       rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+       err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+       c.stats.TickErr(err)
+       return err
+}
+
+func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+       c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
+       err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+       c.stats.TickErr(err)
+       return err
+}
+
+func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
+       resp, err := c.client.ListBlobs(cname, params)
+       c.stats.TickErr(err)
+       return resp, err
+}
+
+func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+       c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
+       err := c.client.DeleteBlob(cname, bname, hdrs)
+       c.stats.TickErr(err)
+       return err
+}
index f4498f4c943c6c7e6c01deadf6686203687d1b3f..7bccafcaa1963fd2f5e2bd5921da1e05b49f1143 100644 (file)
@@ -5,6 +5,7 @@ import (
        "context"
        "crypto/md5"
        "encoding/base64"
+       "encoding/json"
        "encoding/xml"
        "flag"
        "fmt"
@@ -23,6 +24,7 @@ import (
 
        log "github.com/Sirupsen/logrus"
        "github.com/curoverse/azure-sdk-for-go/storage"
+       check "gopkg.in/check.v1"
 )
 
 const (
@@ -369,12 +371,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
                }
        }
 
+       bs := azClient.GetBlobService()
        v := &AzureBlobVolume{
                ContainerName:    container,
                ReadOnly:         readonly,
                AzureReplication: replication,
                azClient:         azClient,
-               bsClient:         azClient.GetBlobService(),
+               bsClient:         &azureBlobClient{client: &bs},
        }
 
        return &TestableAzureBlobVolume{
@@ -385,6 +388,29 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
        }
 }
 
+var _ = check.Suite(&StubbedAzureBlobSuite{})
+
+type StubbedAzureBlobSuite struct {
+       volume            *TestableAzureBlobVolume
+       origHTTPTransport http.RoundTripper
+}
+
+func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+       s.origHTTPTransport = http.DefaultTransport
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+       azureWriteRaceInterval = time.Millisecond
+       azureWriteRacePollTime = time.Nanosecond
+
+       s.volume = NewTestableAzureBlobVolume(c, false, 3)
+}
+
+func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
+       s.volume.Teardown()
+       http.DefaultTransport = s.origHTTPTransport
+}
+
 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
        defer func(t http.RoundTripper) {
                http.DefaultTransport = t
@@ -643,6 +669,35 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex
        }()
 }
 
+func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
+       stats := func() string {
+               buf, err := json.Marshal(s.volume.InternalStats())
+               c.Check(err, check.IsNil)
+               return string(buf)
+       }
+
+       c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+       c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
+
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.NotNil)
+       c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+       err = s.volume.Put(context.Background(), loc, []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+       c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
+
+       _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
index ca5b1a2eb945cb2ae940c9599c955fb59d9e489a..c1d21051a8ebd9676481229d22e6a3670a4550a9 100644 (file)
@@ -14,7 +14,6 @@ import (
        "regexp"
        "strings"
        "sync"
-       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -456,10 +455,10 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
        }
-       v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
        for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-               v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -481,12 +480,12 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                for recent != nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
-                               v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
-                               v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -872,74 +871,58 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
-       b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
+       b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.tickErr(err)
-       return NewCountingReader(rdr, b.stats.tickInBytes), err
+       return NewCountingReader(rdr, b.stats.TickInBytes), err
 }
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
-       b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
+       b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.tickErr(err)
        return resp, err
 }
 
 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
-       err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
-       b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+       err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
+       b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.tickErr(err)
        return err
 }
 
 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
-       err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
-       b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+       err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
+       b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.tickErr(err)
        return err
 }
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
-       b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
+       b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.tickErr(err)
        return err
 }
 
 type s3bucketStats struct {
-       Errors   uint64
-       Ops      uint64
-       GetOps   uint64
-       PutOps   uint64
-       HeadOps  uint64
-       DelOps   uint64
-       ListOps  uint64
-       InBytes  uint64
-       OutBytes uint64
+       statsTicker
+       Ops     uint64
+       GetOps  uint64
+       PutOps  uint64
+       HeadOps uint64
+       DelOps  uint64
+       ListOps uint64
 
        ErrorCodes map[string]uint64 `json:",omitempty"`
 
        lock sync.Mutex
 }
 
-func (s *s3bucketStats) tickInBytes(n uint64) {
-       atomic.AddUint64(&s.InBytes, n)
-}
-
-func (s *s3bucketStats) tickOutBytes(n uint64) {
-       atomic.AddUint64(&s.OutBytes, n)
-}
-
-func (s *s3bucketStats) tick(counters ...*uint64) {
-       for _, counter := range counters {
-               atomic.AddUint64(counter, 1)
-       }
-}
-
 func (s *s3bucketStats) tickErr(err error) {
        if err == nil {
                return
        }
-       atomic.AddUint64(&s.Errors, 1)
+       s.TickErr(err)
        errStr := fmt.Sprintf("%T", err)
        if err, ok := err.(*s3.Error); ok {
                errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
diff --git a/services/keepstore/stats.go b/services/keepstore/stats.go
new file mode 100644 (file)
index 0000000..02d260c
--- /dev/null
@@ -0,0 +1,34 @@
+package main
+
+import (
+       "sync/atomic"
+)
+
+type statsTicker struct {
+       Errors   uint64
+       InBytes  uint64
+       OutBytes uint64
+}
+
+// Tick increments each of the given counters by 1 using
+// atomic.AddUint64.
+func (s *statsTicker) Tick(counters ...*uint64) {
+       for _, counter := range counters {
+               atomic.AddUint64(counter, 1)
+       }
+}
+
+func (s *statsTicker) TickErr(err error) {
+       if err == nil {
+               return
+       }
+       s.Tick(&s.Errors)
+}
+
+func (s *statsTicker) TickInBytes(n uint64) {
+       atomic.AddUint64(&s.InBytes, n)
+}
+
+func (s *statsTicker) TickOutBytes(n uint64) {
+       atomic.AddUint64(&s.OutBytes, n)
+}