From 3121f0dfb9262ccd50d0637c9f7cedf9191f69bf Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 2 Jan 2017 13:57:31 -0500 Subject: [PATCH] 10682: Add backend stats for Azure volumes. --- services/keepstore/azure_blob_volume.go | 96 +++++++++++++++++++- services/keepstore/azure_blob_volume_test.go | 57 +++++++++++- services/keepstore/s3_volume.go | 59 +++++------- services/keepstore/stats.go | 34 +++++++ 4 files changed, 205 insertions(+), 41 deletions(-) create mode 100644 services/keepstore/stats.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 75344890ab..383759803c 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -103,7 +103,27 @@ type AzureBlobVolume struct { RequestTimeout arvados.Duration azClient storage.Client - bsClient storage.BlobStorageClient + bsClient *azureBlobClient +} + +// azureBlobClient wraps storage.BlobStorageClient in order to count +// I/O and API usage stats. +type azureBlobClient struct { + client *storage.BlobStorageClient + stats azureBlobStats +} + +type azureBlobStats struct { + statsTicker + Ops uint64 + GetOps uint64 + GetRangeOps uint64 + CreateOps uint64 + SetMetadataOps uint64 + DelOps uint64 + ListOps uint64 + + lock sync.Mutex } // Examples implements VolumeWithExamples. @@ -147,7 +167,10 @@ func (v *AzureBlobVolume) Start() error { v.azClient.HTTPClient = &http.Client{ Timeout: time.Duration(v.RequestTimeout), } - v.bsClient = v.azClient.GetBlobService() + bs := v.azClient.GetBlobService() + v.bsClient = &azureBlobClient{ + client: &bs, + } ok, err := v.bsClient.ContainerExists(v.ContainerName) if err != nil { @@ -623,3 +646,72 @@ func (v *AzureBlobVolume) EmptyTrash() { log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } + +// InternalStats returns bucket I/O and API call counters. +func (v *AzureBlobVolume) InternalStats() interface{} { + return &v.bsClient.stats +} + +func (c *azureBlobClient) ContainerExists(cname string) (bool, error) { + c.stats.Tick(&c.stats.Ops) + ok, err := c.client.ContainerExists(cname) + c.stats.TickErr(err) + return ok, err +} + +func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) { + c.stats.Tick(&c.stats.Ops) + m, err := c.client.GetBlobMetadata(cname, bname) + c.stats.TickErr(err) + return m, err +} + +func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) { + c.stats.Tick(&c.stats.Ops) + p, err := c.client.GetBlobProperties(cname, bname) + c.stats.TickErr(err) + return p, err +} + +func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.GetOps) + rdr, err := c.client.GetBlob(cname, bname) + c.stats.TickErr(err) + return NewCountingReader(rdr, c.stats.TickInBytes), err +} + +func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps) + rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs) + c.stats.TickErr(err) + return NewCountingReader(rdr, c.stats.TickInBytes), err +} + +func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error { + c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps) + rdr = NewCountingReader(rdr, c.stats.TickOutBytes) + err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs) + c.stats.TickErr(err) + return err +} + +func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error { + c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps) + err := c.client.SetBlobMetadata(cname, bname, m, hdrs) + c.stats.TickErr(err) + return err +} + +func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.ListOps) + resp, err := c.client.ListBlobs(cname, params) + c.stats.TickErr(err) + return resp, err +} + +func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error { + c.stats.Tick(&c.stats.Ops, &c.stats.DelOps) + err := c.client.DeleteBlob(cname, bname, hdrs) + c.stats.TickErr(err) + return err +} diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index f4498f4c94..7bccafcaa1 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/md5" "encoding/base64" + "encoding/json" "encoding/xml" "flag" "fmt" @@ -23,6 +24,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/curoverse/azure-sdk-for-go/storage" + check "gopkg.in/check.v1" ) const ( @@ -369,12 +371,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA } } + bs := azClient.GetBlobService() v := &AzureBlobVolume{ ContainerName: container, ReadOnly: readonly, AzureReplication: replication, azClient: azClient, - bsClient: azClient.GetBlobService(), + bsClient: &azureBlobClient{client: &bs}, } return &TestableAzureBlobVolume{ @@ -385,6 +388,29 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA } } +var _ = check.Suite(&StubbedAzureBlobSuite{}) + +type StubbedAzureBlobSuite struct { + volume *TestableAzureBlobVolume + origHTTPTransport http.RoundTripper +} + +func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) { + s.origHTTPTransport = http.DefaultTransport + http.DefaultTransport = &http.Transport{ + Dial: (&azStubDialer{}).Dial, + } + azureWriteRaceInterval = time.Millisecond + azureWriteRacePollTime = time.Nanosecond + + s.volume = NewTestableAzureBlobVolume(c, false, 3) +} + +func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) { + s.volume.Teardown() + http.DefaultTransport = s.origHTTPTransport +} + func TestAzureBlobVolumeWithGeneric(t *testing.T) { defer func(t http.RoundTripper) { http.DefaultTransport = t @@ -643,6 +669,35 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex }() } +func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { + stats := func() string { + buf, err := json.Marshal(s.volume.InternalStats()) + c.Check(err, check.IsNil) + return string(buf) + } + + c.Check(stats(), check.Matches, `.*"Ops":0,.*`) + c.Check(stats(), check.Matches, `.*"Errors":0,.*`) + + loc := "acbd18db4cc2f85cedef654fccc4a4d8" + _, err := s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.NotNil) + c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) + c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) + c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) + + err = s.volume.Put(context.Background(), loc, []byte("foo")) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) + c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) + + _, err = s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.IsNil) + _, err = s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) +} + func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) { v.azHandler.PutRaw(v.ContainerName, locator, data) } diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index ca5b1a2eb9..c1d21051a8 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -14,7 +14,6 @@ import ( "regexp" "strings" "sync" - "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -456,10 +455,10 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { Prefix: "recent/" + prefix, PageSize: v.IndexPageSize, } - v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) - v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() { - v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) if data.Key >= "g" { // Conveniently, "recent/*" and "trash/*" are // lexically greater than all hex-encoded data @@ -481,12 +480,12 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { for recent != nil { if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 { recent = recentL.Next() - v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) continue } else if cmp == 0 { stamp = recent recent = recentL.Next() - v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) break } else { // recent/X marker is missing: we'll @@ -872,74 +871,58 @@ type s3bucket struct { func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) { rdr, err := b.Bucket.GetReader(path) - b.stats.tick(&b.stats.Ops, &b.stats.GetOps) + b.stats.Tick(&b.stats.Ops, &b.stats.GetOps) b.stats.tickErr(err) - return NewCountingReader(rdr, b.stats.tickInBytes), err + return NewCountingReader(rdr, b.stats.TickInBytes), err } func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) { resp, err := b.Bucket.Head(path, headers) - b.stats.tick(&b.stats.Ops, &b.stats.HeadOps) + b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps) b.stats.tickErr(err) return resp, err } func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error { - err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options) - b.stats.tick(&b.stats.Ops, &b.stats.PutOps) + err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options) + b.stats.Tick(&b.stats.Ops, &b.stats.PutOps) b.stats.tickErr(err) return err } func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error { - err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options) - b.stats.tick(&b.stats.Ops, &b.stats.PutOps) + err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options) + b.stats.Tick(&b.stats.Ops, &b.stats.PutOps) b.stats.tickErr(err) return err } func (b *s3bucket) Del(path string) error { err := b.Bucket.Del(path) - b.stats.tick(&b.stats.Ops, &b.stats.DelOps) + b.stats.Tick(&b.stats.Ops, &b.stats.DelOps) b.stats.tickErr(err) return err } type s3bucketStats struct { - Errors uint64 - Ops uint64 - GetOps uint64 - PutOps uint64 - HeadOps uint64 - DelOps uint64 - ListOps uint64 - InBytes uint64 - OutBytes uint64 + statsTicker + Ops uint64 + GetOps uint64 + PutOps uint64 + HeadOps uint64 + DelOps uint64 + ListOps uint64 ErrorCodes map[string]uint64 `json:",omitempty"` lock sync.Mutex } -func (s *s3bucketStats) tickInBytes(n uint64) { - atomic.AddUint64(&s.InBytes, n) -} - -func (s *s3bucketStats) tickOutBytes(n uint64) { - atomic.AddUint64(&s.OutBytes, n) -} - -func (s *s3bucketStats) tick(counters ...*uint64) { - for _, counter := range counters { - atomic.AddUint64(counter, 1) - } -} - func (s *s3bucketStats) tickErr(err error) { if err == nil { return } - atomic.AddUint64(&s.Errors, 1) + s.TickErr(err) errStr := fmt.Sprintf("%T", err) if err, ok := err.(*s3.Error); ok { errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code) diff --git a/services/keepstore/stats.go b/services/keepstore/stats.go new file mode 100644 index 0000000000..02d260c00c --- /dev/null +++ b/services/keepstore/stats.go @@ -0,0 +1,34 @@ +package main + +import ( + "sync/atomic" +) + +type statsTicker struct { + Errors uint64 + InBytes uint64 + OutBytes uint64 +} + +// Tick increments each of the given counters by 1 using +// atomic.AddUint64. +func (s *statsTicker) Tick(counters ...*uint64) { + for _, counter := range counters { + atomic.AddUint64(counter, 1) + } +} + +func (s *statsTicker) TickErr(err error) { + if err == nil { + return + } + s.Tick(&s.Errors) +} + +func (s *statsTicker) TickInBytes(n uint64) { + atomic.AddUint64(&s.InBytes, n) +} + +func (s *statsTicker) TickOutBytes(n uint64) { + atomic.AddUint64(&s.OutBytes, n) +} -- 2.30.2