Merge branch 'master' into 13937-keepstore-prometheus
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 6 Mar 2019 21:09:28 +0000 (18:09 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 6 Mar 2019 21:09:28 +0000 (18:09 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

17 files changed:
services/keepstore/azure_blob_volume.go
services/keepstore/config.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/metrics.go [new file with mode: 0644]
services/keepstore/mounts_test.go
services/keepstore/pipe_adapters.go
services/keepstore/proxy_remote_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/stats_ticker.go
services/keepstore/unix_volume.go [moved from services/keepstore/volume_unix.go with 96% similarity]
services/keepstore/unix_volume_test.go [moved from services/keepstore/volume_unix_test.go with 98% similarity]
services/keepstore/volume.go
services/keepstore/volume_test.go

index 4f7339facf4ace001ac886a5076afc217e040c18..66956b89ee83928261bc67dcedba075c20b78397 100644 (file)
@@ -23,6 +23,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@ -147,7 +148,7 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
@@ -183,6 +184,12 @@ func (v *AzureBlobVolume) Start() error {
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+       v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
+       v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
        return nil
 }
 
@@ -727,6 +734,7 @@ type azureContainer struct {
 }
 
 func (c *azureContainer) Exists() (bool, error) {
+       c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
@@ -734,6 +742,7 @@ func (c *azureContainer) Exists() (bool, error) {
 }
 
 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+       c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
@@ -742,6 +751,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er
 }
 
 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+       c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
@@ -750,6 +760,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie
 }
 
 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+       c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
@@ -758,6 +769,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 }
 
 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+       c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@ -785,6 +797,7 @@ func (r *readerWithAzureLen) Len() int {
 }
 
 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+       c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
@@ -799,6 +812,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i
 }
 
 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+       c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
@@ -808,6 +822,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o
 }
 
 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
@@ -815,6 +830,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.
 }
 
 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+       c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)
index 2bd989de30c1bffc020777ba1ecbb895591570cf..0902b99eb257df0e88059e38344aaad180f0b6ba 100644 (file)
@@ -13,6 +13,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
@@ -81,7 +82,7 @@ func DefaultConfig() *Config {
 
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
        if cfg.Debug {
                log.Level = logrus.DebugLevel
                cfg.debugLogf = log.Printf
@@ -143,8 +144,9 @@ func (cfg *Config) Start() error {
                        return fmt.Errorf("no volumes found")
                }
        }
+       vm := newVolumeMetricsVecs(reg)
        for _, v := range cfg.Volumes {
-               if err := v.Start(); err != nil {
+               if err := v.Start(vm.opsCounters, vm.errCounters, vm.ioBytes); err != nil {
                        return fmt.Errorf("volume %s: %s", v, err)
                }
                log.Printf("Using volume %v (writable=%v)", v, v.Writable())
index 32b360b1276940c9da69bc4b44b02785ffefc97f..ad907ef10138f213e3831223d867fd3c114736d9 100644 (file)
@@ -28,6 +28,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var testCluster = &arvados.Cluster{
@@ -845,7 +846,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -857,7 +858,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -997,7 +998,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 2a1bbc972ffa6e4fe0675291b0c923efc4d4ac8d..51dd73a513c1d4c729a6743aaabe0cefa1202c4b 100644 (file)
@@ -24,6 +24,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/gorilla/mux"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type router struct {
@@ -31,14 +32,16 @@ type router struct {
        limiter     httpserver.RequestCounter
        cluster     *arvados.Cluster
        remoteProxy remoteProxy
+       metrics     *nodeMetrics
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
        rtr := &router{
                Router:  mux.NewRouter(),
                cluster: cluster,
+               metrics: &nodeMetrics{reg: reg},
        }
 
        rtr.HandleFunc(
@@ -85,8 +88,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
        rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
        rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+       rtr.metrics.setupBufferPoolMetrics(bufs)
+       rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+       rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+       rtr.metrics.setupRequestMetrics(rtr.limiter)
 
-       instrumented := httpserver.Instrument(nil, nil,
+       instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
                httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
        return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
 }
index a6c8cd99545c24fdc2a56f6c2ff1866682a6ed6d..fcbdddacb1d585e995c8f23a0528be2ce8c1723c 100644 (file)
@@ -18,6 +18,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var version = "dev"
@@ -121,7 +122,9 @@ func main() {
 
        log.Printf("keepstore %s started", version)
 
-       err = theConfig.Start()
+       metricsRegistry := prometheus.NewRegistry()
+
+       err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
@@ -174,7 +177,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter(cluster)
+       router := MakeRESTRouter(cluster, metricsRegistry)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
new file mode 100644 (file)
index 0000000..4a154bd
--- /dev/null
@@ -0,0 +1,130 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type nodeMetrics struct {
+       reg *prometheus.Registry
+}
+
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_bytes_allocated",
+                       Help:      "Number of bytes allocated to buffers",
+               },
+               func() float64 { return float64(b.Alloc()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_buffers_max",
+                       Help:      "Maximum number of buffers allowed",
+               },
+               func() float64 { return float64(b.Cap()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_buffers_in_use",
+                       Help:      "Number of buffers in use",
+               },
+               func() float64 { return float64(b.Len()) },
+       ))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      fmt.Sprintf("%s_queue_in_progress", qName),
+                       Help:      fmt.Sprintf("Number of %s requests in progress", qName),
+               },
+               func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      fmt.Sprintf("%s_queue_queued", qName),
+                       Help:      fmt.Sprintf("Number of queued %s requests", qName),
+               },
+               func() float64 { return float64(getWorkQueueStatus(q).Queued) },
+       ))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "requests_current",
+                       Help:      "Number of requests in progress",
+               },
+               func() float64 { return float64(rc.Current()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "requests_max",
+                       Help:      "Maximum number of concurrent requests",
+               },
+               func() float64 { return float64(rc.Max()) },
+       ))
+}
+
+type volumeMetricsVecs struct {
+       ioBytes     *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       opsCounters *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+       m := &volumeMetricsVecs{}
+       m.opsCounters = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_operations",
+                       Help:      "Number of volume operations",
+               },
+               []string{"device_id", "operation"},
+       )
+       reg.MustRegister(m.opsCounters)
+       m.errCounters = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_errors",
+                       Help:      "Number of volume errors",
+               },
+               []string{"device_id", "error_type"},
+       )
+       reg.MustRegister(m.errCounters)
+       m.ioBytes = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_bytes",
+                       Help:      "Volume I/O traffic in bytes",
+               },
+               []string{"device_id", "direction"},
+       )
+       reg.MustRegister(m.ioBytes)
+
+       return m
+}
index 31b1a684fe6a077ebbbfebf7bb846f6f508a00b5..ac30c369a551af7304d587c56fbba205c7fee705 100644 (file)
@@ -12,6 +12,7 @@ import (
        "net/http/httptest"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -28,15 +29,16 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.ManagementToken = arvadostest.ManagementToken
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(testCluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(testCluster, r)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 func (s *MountsSuite) TestMounts(c *check.C) {
@@ -131,7 +133,9 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        }
        json.NewDecoder(resp.Body).Decode(&j)
        found := make(map[string]bool)
+       names := map[string]bool{}
        for _, g := range j {
+               names[g.Name] = true
                for _, m := range g.Metric {
                        if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
                                c.Check(m.Summary.SampleCount, check.Equals, "2")
@@ -143,6 +147,24 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        }
        c.Check(found["request_duration_seconds"], check.Equals, true)
        c.Check(found["time_to_status_seconds"], check.Equals, true)
+
+       metricsNames := []string{
+               "arvados_keepstore_bufferpool_buffers_in_use",
+               "arvados_keepstore_bufferpool_buffers_max",
+               "arvados_keepstore_bufferpool_bytes_allocated",
+               "arvados_keepstore_pull_queue_in_progress",
+               "arvados_keepstore_pull_queue_queued",
+               "arvados_keepstore_requests_current",
+               "arvados_keepstore_requests_max",
+               "arvados_keepstore_trash_queue_in_progress",
+               "arvados_keepstore_trash_queue_queued",
+               "request_duration_seconds",
+               "time_to_status_seconds",
+       }
+       for _, m := range metricsNames {
+               _, ok := names[m]
+               c.Check(ok, check.Equals, true)
+       }
 }
 
 func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
index e4a5865a43dd10d20215eb9cebd13bd6fe2271b9..69ed6d2ff5f1f8d80bd6c6e6ebe7d75f7e4ff259 100644 (file)
@@ -39,7 +39,7 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (i
        }
 }
 
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
 // from buf into the pipe. If ctx is done before all data is copied,
 // putWithPipe closes the pipe with an error, and returns early with
 // an error.
index 6e720b8499f366c5d931729047de7a3b1b632faf..6c22d1d32aa2f0745a2cc424cdfeef4d4d76ca75 100644 (file)
@@ -20,6 +20,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -100,15 +101,16 @@ func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.blobSigningKey = []byte(knownKey)
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(s.cluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(s.cluster, r)
 }
 
 func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
        s.remoteAPI.Close()
        s.remoteKeepproxy.Close()
 }
index 7b5077c1a7f70dad794e14fc148ae1da7fc60d04..8e667e048f47ff3f3ac91df65c960ac94511e8b3 100644 (file)
@@ -14,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        . "gopkg.in/check.v1"
 )
 
@@ -58,7 +59,7 @@ func (s *PullWorkerTestSuite) TearDownTest(c *C) {
        pullq = nil
        teardown()
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 var firstPullList = []byte(`[
index fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c..0bf0d9a10e3241e375f6abc070e27260fb4d698e 100644 (file)
@@ -25,6 +25,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const (
@@ -198,7 +199,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        region, ok := aws.Regions[v.Region]
        if v.Endpoint == "" {
                if !ok {
@@ -248,6 +249,12 @@ func (v *S3Volume) Start() error {
                        Name: v.Bucket,
                },
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.bucket.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+       v.bucket.stats.errCounters = errCounters.MustCurryWith(lbls)
+       v.bucket.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
        return nil
 }
 
@@ -929,6 +936,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.TickOps("list")
        lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
@@ -965,6 +973,7 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
+       b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
        return NewCountingReader(rdr, b.stats.TickInBytes), err
@@ -972,6 +981,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
+       b.stats.TickOps("head")
        b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.TickErr(err)
        return resp, err
@@ -990,6 +1000,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                r = NewCountingReader(r, b.stats.TickOutBytes)
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+       b.stats.TickOps("put")
        b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.TickErr(err)
        return err
@@ -997,6 +1008,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
+       b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        return err
index 10c71125df39acb3feadc4e69e4d2190d53a10fe..baa9dda9e6619c32b6c00c0a1671b218999ed4f8 100644 (file)
@@ -20,6 +20,7 @@ import (
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -170,7 +171,8 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
        vol := *v.S3Volume
        vol.Endpoint = srv.URL
        v = &TestableS3Volume{S3Volume: &vol}
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes)
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -430,7 +432,8 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
                server:      srv,
                serverClock: clock,
        }
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes)
        err = v.bucket.PutBucket(s3.ACL("private"))
        c.Assert(err, check.IsNil)
        return v
@@ -448,7 +451,7 @@ Volumes:
        c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
 }
 
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
        defer os.Remove(tmp.Name())
@@ -459,7 +462,7 @@ func (v *TestableS3Volume) Start() error {
        v.S3Volume.AccessKeyFile = tmp.Name()
        v.S3Volume.SecretKeyFile = tmp.Name()
 
-       v.c.Assert(v.S3Volume.Start(), check.IsNil)
+       v.c.Assert(v.S3Volume.Start(opsCounters, errCounters, ioBytes), check.IsNil)
        return nil
 }
 
index 377a53675783b890fa7863dd98ea50681074697b..342b9e32058e23a1f09fc305d8fdc37caf104198 100644 (file)
@@ -7,6 +7,8 @@ package main
 import (
        "sync"
        "sync/atomic"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type statsTicker struct {
@@ -16,6 +18,10 @@ type statsTicker struct {
 
        ErrorCodes map[string]uint64 `json:",omitempty"`
        lock       sync.Mutex
+
+       opsCounters *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       ioBytes     *prometheus.CounterVec
 }
 
 // Tick increments each of the given counters by 1 using
@@ -41,14 +47,33 @@ func (s *statsTicker) TickErr(err error, errType string) {
        }
        s.ErrorCodes[errType]++
        s.lock.Unlock()
+       if s.errCounters != nil {
+               s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+       }
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.OutBytes, n)
 }
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+       if s.opsCounters == nil {
+               return
+       }
+       for _, opType := range operations {
+               s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+       }
+}
similarity index 96%
rename from services/keepstore/volume_unix.go
rename to services/keepstore/unix_volume.go
index 23d675359244942097072d88e1bd98daf9d46c6c..8bdbf936293beb41058e7c23897296598c52cf6b 100644 (file)
@@ -21,6 +21,8 @@ import (
        "sync/atomic"
        "syscall"
        "time"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type unixVolumeAdder struct {
@@ -28,7 +30,7 @@ type unixVolumeAdder struct {
 }
 
 // String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
        return "-"
 }
 
@@ -218,7 +220,7 @@ func (v *UnixVolume) Type() string {
 }
 
 // Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        if v.Serialize {
                v.locker = &sync.Mutex{}
        }
@@ -228,7 +230,14 @@ func (v *UnixVolume) Start() error {
        if v.DirectoryReplication == 0 {
                v.DirectoryReplication = 1
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.os.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+       v.os.stats.errCounters = errCounters.MustCurryWith(lbls)
+       v.os.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
        _, err := v.os.Stat(v.Root)
+
        return err
 }
 
@@ -252,6 +261,7 @@ func (v *UnixVolume) Touch(loc string) error {
        }
        defer v.unlockfile(f)
        ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       v.os.stats.TickOps("utimes")
        v.os.stats.Tick(&v.os.stats.UtimesOps)
        err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
        v.os.stats.TickErr(err)
@@ -339,7 +349,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
        return putWithPipe(ctx, loc, block, v)
 }
 
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
 func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
        if v.ReadOnly {
                return MethodDisabledError
@@ -439,6 +449,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                return err
        }
        defer rootdir.Close()
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        for {
                names, err := rootdir.Readdirnames(1)
@@ -461,6 +472,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        lastErr = err
                        continue
                }
+               v.os.stats.TickOps("readdir")
                v.os.stats.Tick(&v.os.stats.ReaddirOps)
                for {
                        fileInfo, err := blockdir.Readdir(1)
@@ -549,6 +561,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return MethodDisabledError
        }
 
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(loc))
        if err != nil {
@@ -695,6 +708,7 @@ func (v *UnixVolume) unlock() {
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func (v *UnixVolume) lockfile(f *os.File) error {
+       v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
        v.os.stats.TickErr(err)
@@ -813,6 +827,7 @@ type osWithStats struct {
 }
 
 func (o *osWithStats) Open(name string) (*os.File, error) {
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.Open(name)
        o.stats.TickErr(err)
@@ -820,6 +835,7 @@ func (o *osWithStats) Open(name string) (*os.File, error) {
 }
 
 func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.OpenFile(name, flag, perm)
        o.stats.TickErr(err)
@@ -827,6 +843,7 @@ func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.Fil
 }
 
 func (o *osWithStats) Remove(path string) error {
+       o.stats.TickOps("unlink")
        o.stats.Tick(&o.stats.UnlinkOps)
        err := os.Remove(path)
        o.stats.TickErr(err)
@@ -834,6 +851,7 @@ func (o *osWithStats) Remove(path string) error {
 }
 
 func (o *osWithStats) Rename(a, b string) error {
+       o.stats.TickOps("rename")
        o.stats.Tick(&o.stats.RenameOps)
        err := os.Rename(a, b)
        o.stats.TickErr(err)
@@ -841,6 +859,7 @@ func (o *osWithStats) Rename(a, b string) error {
 }
 
 func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+       o.stats.TickOps("stat")
        o.stats.Tick(&o.stats.StatOps)
        fi, err := os.Stat(path)
        o.stats.TickErr(err)
@@ -848,6 +867,7 @@ func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
 }
 
 func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+       o.stats.TickOps("create")
        o.stats.Tick(&o.stats.CreateOps)
        f, err := ioutil.TempFile(dir, base)
        o.stats.TickErr(err)
similarity index 98%
rename from services/keepstore/volume_unix_test.go
rename to services/keepstore/unix_volume_test.go
index 7f1cd219644ab241f2c0a8a0e2353c8f4c16844f..fe20f33d67dc454980cef5e3c9b9d9836f8e52d6 100644 (file)
@@ -20,6 +20,7 @@ import (
        "time"
 
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -115,7 +116,8 @@ func TestReplicationDefault1(t *testing.T) {
                Root:     "/",
                ReadOnly: true,
        }
-       if err := v.Start(); err != nil {
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       if err := v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes); err != nil {
                t.Error(err)
        }
        if got := v.Replication(); got != 1 {
index 6bce05bec033fbda6c759b6b4266bcbff0f3e051..39e2d5206702b452784a855fb8fdf2c93ceb485e 100644 (file)
@@ -14,6 +14,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type BlockWriter interface {
@@ -39,7 +40,7 @@ type Volume interface {
        // Do whatever private setup tasks and configuration checks
        // are needed. Return non-nil if the volume is unusable (e.g.,
        // invalid config).
-       Start() error
+       Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error
 
        // Get a block: copy the block data into buf, and return the
        // number of bytes copied.
index 046f3fac2e0c8c27081c22fea69a0aae7f02acda..df6a09e3ab56fbd80f6776c20cdb881e83df9233 100644 (file)
@@ -15,6 +15,8 @@ import (
        "strings"
        "sync"
        "time"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 // A TestableVolume allows test suites to manipulate the state of an
@@ -212,7 +214,7 @@ func (v *MockVolume) Type() string {
        return "Mock"
 }
 
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
        return nil
 }