13937: Refactors approach to pass volume metrics as curried vecs (WIP)
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 13 Feb 2019 19:33:53 +0000 (16:33 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 13 Feb 2019 19:33:53 +0000 (16:33 -0300)
Pending: driver-specific metrics

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

16 files changed:
services/keepstore/azure_blob_volume.go
services/keepstore/config.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/metrics.go
services/keepstore/mounts_test.go
services/keepstore/proxy_remote_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/stats_ticker.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index 5da2055b7736d117f6a7015a8486a948ee80a4d7..ab199d9914ce0aaa52a3e9e406dfb6968b398454 100644 (file)
@@ -147,7 +147,7 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(m *volumeMetrics) error {
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
index 2bd989de30c1bffc020777ba1ecbb895591570cf..43b309916ea36b885941df58155ec1cc7736701d 100644 (file)
@@ -13,6 +13,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
@@ -81,7 +82,7 @@ func DefaultConfig() *Config {
 
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
        if cfg.Debug {
                log.Level = logrus.DebugLevel
                cfg.debugLogf = log.Printf
@@ -143,8 +144,13 @@ func (cfg *Config) Start() error {
                        return fmt.Errorf("no volumes found")
                }
        }
+       vm := newVolumeMetricsVecs(reg)
        for _, v := range cfg.Volumes {
-               if err := v.Start(); err != nil {
+               metrics := vm.curryWith(
+                       v.String(),
+                       v.Status().MountPoint,
+                       fmt.Sprintf("%d", v.Status().DeviceNum))
+               if err := v.Start(metrics); err != nil {
                        return fmt.Errorf("volume %s: %s", v, err)
                }
                log.Printf("Using volume %v (writable=%v)", v, v.Writable())
index c37a4d112fb8b86aaa076431f08524930ce83d0b..cbfc0bcdab992cf4d729cc7c1a1d2f03effe538c 100644 (file)
@@ -28,6 +28,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var testCluster = &arvados.Cluster{
@@ -827,7 +828,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -839,7 +840,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -979,7 +980,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index e4f025d6b1d71cd0e490da6f5c6525b97273874a..7da9f69adbae4c5fa2f21fe24128d66355bd7838 100644 (file)
@@ -32,17 +32,16 @@ type router struct {
        limiter     httpserver.RequestCounter
        cluster     *arvados.Cluster
        remoteProxy remoteProxy
-       registry    *prometheus.Registry
-       metrics     nodeMetrics
+       metrics     *nodeMetrics
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
        rtr := &router{
-               Router:   mux.NewRouter(),
-               cluster:  cluster,
-               registry: prometheus.NewRegistry(),
+               Router:  mux.NewRouter(),
+               cluster: cluster,
+               metrics: &nodeMetrics{reg: reg},
        }
 
        rtr.HandleFunc(
@@ -89,13 +88,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
        rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
        rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
-       rtr.metrics = nodeMetrics{
-               reg: rtr.registry,
-               rc:  rtr.limiter,
-       }
-       rtr.metrics.setup()
+       rtr.metrics.setupBufferPoolMetrics(bufs)
+       rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+       rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+       rtr.metrics.setupRequestMetrics(rtr.limiter)
 
-       instrumented := httpserver.Instrument(rtr.registry, nil,
+       instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
                httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
        return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
 }
index 6ae414bf931ce9164f7beefcc0d9be294da6e9c5..fb1e1ea54516ef9375a0ea8aad91938ea84f3e7f 100644 (file)
@@ -18,6 +18,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var version = "dev"
@@ -120,7 +121,9 @@ func main() {
 
        log.Printf("keepstore %s started", version)
 
-       err = theConfig.Start()
+       metricsRegistry := prometheus.NewRegistry()
+
+       err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
@@ -173,7 +176,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter(cluster)
+       router := MakeRESTRouter(cluster, metricsRegistry)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
index f0815ae4ab3f6171f6aaa99e5bc6d70170830967..63e406c9302a8b6deaf84aee992d8baffa25d1b8 100644 (file)
@@ -13,10 +13,9 @@ import (
 
 type nodeMetrics struct {
        reg *prometheus.Registry
-       rc  httpserver.RequestCounter
 }
 
-func (m *nodeMetrics) setup() {
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
@@ -24,7 +23,7 @@ func (m *nodeMetrics) setup() {
                        Name:      "bufferpool_bytes_allocated",
                        Help:      "Number of bytes allocated to buffers",
                },
-               func() float64 { return float64(bufs.Alloc()) },
+               func() float64 { return float64(b.Alloc()) },
        ))
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
@@ -33,7 +32,7 @@ func (m *nodeMetrics) setup() {
                        Name:      "bufferpool_buffers_max",
                        Help:      "Maximum number of buffers allowed",
                },
-               func() float64 { return float64(bufs.Cap()) },
+               func() float64 { return float64(b.Cap()) },
        ))
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
@@ -42,173 +41,214 @@ func (m *nodeMetrics) setup() {
                        Name:      "bufferpool_buffers_in_use",
                        Help:      "Number of buffers in use",
                },
-               func() float64 { return float64(bufs.Len()) },
+               func() float64 { return float64(b.Len()) },
        ))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
                        Subsystem: "keepstore",
-                       Name:      "pull_queue_in_progress",
-                       Help:      "Number of pull requests in progress",
+                       Name:      fmt.Sprintf("%s_queue_in_progress", qName),
+                       Help:      fmt.Sprintf("Number of %s requests in progress", qName),
                },
-               func() float64 { return float64(getWorkQueueStatus(pullq).InProgress) },
+               func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
        ))
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
                        Subsystem: "keepstore",
-                       Name:      "pull_queue_queued",
-                       Help:      "Number of queued pull requests",
+                       Name:      fmt.Sprintf("%s_queue_queued", qName),
+                       Help:      fmt.Sprintf("Number of queued %s requests", qName),
                },
-               func() float64 { return float64(getWorkQueueStatus(pullq).Queued) },
+               func() float64 { return float64(getWorkQueueStatus(q).Queued) },
        ))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
                        Subsystem: "keepstore",
-                       Name:      "trash_queue_in_progress",
-                       Help:      "Number of trash requests in progress",
+                       Name:      "requests_current",
+                       Help:      "Number of requests in progress",
                },
-               func() float64 { return float64(getWorkQueueStatus(trashq).InProgress) },
+               func() float64 { return float64(rc.Current()) },
        ))
        m.reg.MustRegister(prometheus.NewGaugeFunc(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
                        Subsystem: "keepstore",
-                       Name:      "trash_queue_queued",
-                       Help:      "Number of queued trash requests",
+                       Name:      "requests_max",
+                       Help:      "Maximum number of concurrent requests",
                },
-               func() float64 { return float64(getWorkQueueStatus(trashq).Queued) },
+               func() float64 { return float64(rc.Max()) },
        ))
-       m.reg.MustRegister(prometheus.NewGaugeFunc(
+}
+
+type volumeMetricsVecs struct {
+       BytesFree  *prometheus.GaugeVec
+       BytesUsed  *prometheus.GaugeVec
+       Errors     *prometheus.CounterVec
+       Ops        *prometheus.CounterVec
+       CompareOps *prometheus.CounterVec
+       GetOps     *prometheus.CounterVec
+       PutOps     *prometheus.CounterVec
+       TouchOps   *prometheus.CounterVec
+       InBytes    *prometheus.CounterVec
+       OutBytes   *prometheus.CounterVec
+       ErrorCodes *prometheus.CounterVec
+}
+
+type volumeMetrics struct {
+       BytesFree  prometheus.Gauge
+       BytesUsed  prometheus.Gauge
+       Errors     prometheus.Counter
+       Ops        prometheus.Counter
+       CompareOps prometheus.Counter
+       GetOps     prometheus.Counter
+       PutOps     prometheus.Counter
+       TouchOps   prometheus.Counter
+       InBytes    prometheus.Counter
+       OutBytes   prometheus.Counter
+       ErrorCodes *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+       m := &volumeMetricsVecs{}
+       m.BytesFree = prometheus.NewGaugeVec(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
                        Subsystem: "keepstore",
-                       Name:      "requests_current",
-                       Help:      "Number of requests in progress",
+                       Name:      "volume_bytes_free",
+                       Help:      "Number of free bytes on the volume",
                },
-               func() float64 { return float64(m.rc.Current()) },
-       ))
-       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.BytesFree)
+       m.BytesUsed = prometheus.NewGaugeVec(
                prometheus.GaugeOpts{
                        Namespace: "arvados",
                        Subsystem: "keepstore",
-                       Name:      "requests_max",
-                       Help:      "Maximum number of concurrent requests",
+                       Name:      "volume_bytes_used",
+                       Help:      "Number of used bytes on the volume",
                },
-               func() float64 { return float64(m.rc.Max()) },
-       ))
-       // Register individual volume's metrics
-       vols := KeepVM.AllReadable()
-       for _, vol := range vols {
-               labels := prometheus.Labels{
-                       "label":         vol.String(),
-                       "mount_point":   vol.Status().MountPoint,
-                       "device_number": fmt.Sprintf("%d", vol.Status().DeviceNum),
-               }
-               if vol, ok := vol.(InternalMetricser); ok {
-                       // Per-driver internal metrics
-                       vol.SetupInternalMetrics(m.reg, labels)
-               }
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_bytes_free",
-                               Help:        "Number of free bytes on the volume",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(vol.Status().BytesFree) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_bytes_used",
-                               Help:        "Number of used bytes on the volume",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(vol.Status().BytesUsed) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_errors",
-                               Help:        "Number of I/O errors",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).Errors) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_ops",
-                               Help:        "Number of I/O operations",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).Ops) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_compare_ops",
-                               Help:        "Number of I/O compare operations",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).CompareOps) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_get_ops",
-                               Help:        "Number of I/O get operations",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).GetOps) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_put_ops",
-                               Help:        "Number of I/O put operations",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).PutOps) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_touch_ops",
-                               Help:        "Number of I/O touch operations",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).TouchOps) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_input_bytes",
-                               Help:        "Number of input bytes",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).InBytes) },
-               ))
-               m.reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        "volume_io_output_bytes",
-                               Help:        "Number of output bytes",
-                               ConstLabels: labels,
-                       },
-                       func() float64 { return float64(KeepVM.VolumeStats(vol).OutBytes) },
-               ))
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.BytesUsed)
+       m.Errors = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_errors",
+                       Help:      "Number of volume I/O errors",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.Errors)
+       m.Ops = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_ops",
+                       Help:      "Number of volume I/O operations",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.Ops)
+       m.CompareOps = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_compare_ops",
+                       Help:      "Number of volume I/O compare operations",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.CompareOps)
+       m.GetOps = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_get_ops",
+                       Help:      "Number of volume I/O get operations",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.GetOps)
+       m.PutOps = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_put_ops",
+                       Help:      "Number of volume I/O put operations",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.PutOps)
+       m.TouchOps = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_touch_ops",
+                       Help:      "Number of volume I/O touch operations",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.TouchOps)
+       m.InBytes = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_in_bytes",
+                       Help:      "Number of input bytes",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.InBytes)
+       m.OutBytes = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_out_bytes",
+                       Help:      "Number of output bytes",
+               },
+               []string{"label", "mount_point", "device_number"},
+       )
+       reg.MustRegister(m.OutBytes)
+       m.ErrorCodes = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_error_codes",
+                       Help:      "Number of I/O errors by error code",
+               },
+               []string{"label", "mount_point", "device_number", "error_code"},
+       )
+       reg.MustRegister(m.ErrorCodes)
+
+       return m
+}
+
+func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
+       lbls := []string{lbl, mnt, dev}
+       curried := &volumeMetrics{
+               BytesFree:  m.BytesFree.WithLabelValues(lbls...),
+               BytesUsed:  m.BytesUsed.WithLabelValues(lbls...),
+               Errors:     m.Errors.WithLabelValues(lbls...),
+               Ops:        m.Ops.WithLabelValues(lbls...),
+               CompareOps: m.CompareOps.WithLabelValues(lbls...),
+               GetOps:     m.GetOps.WithLabelValues(lbls...),
+               PutOps:     m.PutOps.WithLabelValues(lbls...),
+               TouchOps:   m.TouchOps.WithLabelValues(lbls...),
+               InBytes:    m.InBytes.WithLabelValues(lbls...),
+               OutBytes:   m.OutBytes.WithLabelValues(lbls...),
+               ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
+                       "label":         lbl,
+                       "mount_point":   mnt,
+                       "device_number": dev,
+               }),
        }
+       return curried
 }
index 588bb4299c531ff72d6633b95464790c7e9bb0b6..ac30c369a551af7304d587c56fbba205c7fee705 100644 (file)
@@ -12,6 +12,7 @@ import (
        "net/http/httptest"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -28,15 +29,16 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.ManagementToken = arvadostest.ManagementToken
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(testCluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(testCluster, r)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 func (s *MountsSuite) TestMounts(c *check.C) {
index 6e720b8499f366c5d931729047de7a3b1b632faf..6c22d1d32aa2f0745a2cc424cdfeef4d4d76ca75 100644 (file)
@@ -20,6 +20,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -100,15 +101,16 @@ func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.blobSigningKey = []byte(knownKey)
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(s.cluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(s.cluster, r)
 }
 
 func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
        s.remoteAPI.Close()
        s.remoteKeepproxy.Close()
 }
index 7b5077c1a7f70dad794e14fc148ae1da7fc60d04..8e667e048f47ff3f3ac91df65c960ac94511e8b3 100644 (file)
@@ -14,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        . "gopkg.in/check.v1"
 )
 
@@ -58,7 +59,7 @@ func (s *PullWorkerTestSuite) TearDownTest(c *C) {
        pullq = nil
        teardown()
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 var firstPullList = []byte(`[
index fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c..f281b363e8e996174adf14e5da63db34d48478f7 100644 (file)
@@ -198,7 +198,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(m *volumeMetrics) error {
        region, ok := aws.Regions[v.Region]
        if v.Endpoint == "" {
                if !ok {
index 10c71125df39acb3feadc4e69e4d2190d53a10fe..e88efffe41aa52d72ddd19553ea0b378d8da3e83 100644 (file)
@@ -20,6 +20,7 @@ import (
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -170,7 +171,9 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
        vol := *v.S3Volume
        vol.Endpoint = srv.URL
        v = &TestableS3Volume{S3Volume: &vol}
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+               v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+       v.Start(metrics)
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -430,7 +433,9 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
                server:      srv,
                serverClock: clock,
        }
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+               v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+       v.Start(metrics)
        err = v.bucket.PutBucket(s3.ACL("private"))
        c.Assert(err, check.IsNil)
        return v
@@ -448,7 +453,7 @@ Volumes:
        c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
 }
 
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(m *volumeMetrics) error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
        defer os.Remove(tmp.Name())
@@ -459,7 +464,7 @@ func (v *TestableS3Volume) Start() error {
        v.S3Volume.AccessKeyFile = tmp.Name()
        v.S3Volume.SecretKeyFile = tmp.Name()
 
-       v.c.Assert(v.S3Volume.Start(), check.IsNil)
+       v.c.Assert(v.S3Volume.Start(m), check.IsNil)
        return nil
 }
 
index 36fbcf98af216183afc0169b7643c5a1afcfafb2..6742dbaa9aa1a8669c875266c9cac09dce7a8a48 100644 (file)
@@ -5,7 +5,6 @@
 package main
 
 import (
-       "fmt"
        "sync"
        "sync/atomic"
 
@@ -17,32 +16,16 @@ type statsTicker struct {
        InBytes  uint64
        OutBytes uint64
 
+       // Prometheus metrics
+       PromErrors     prometheus.Counter
+       PromInBytes    prometheus.Counter
+       PromOutBytes   prometheus.Counter
+       PromErrorCodes *prometheus.CounterVec
+
        ErrorCodes map[string]uint64 `json:",omitempty"`
        lock       sync.Mutex
 }
 
-func (s *statsTicker) setupPrometheus(drv string, reg *prometheus.Registry, lbl prometheus.Labels) {
-       metrics := map[string][]interface{}{
-               "errors":    []interface{}{string("errors"), s.Errors},
-               "in_bytes":  []interface{}{string("input bytes"), s.InBytes},
-               "out_bytes": []interface{}{string("output bytes"), s.OutBytes},
-       }
-       for mName, data := range metrics {
-               mHelp := data[0].(string)
-               mVal := data[1].(uint64)
-               reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        fmt.Sprintf("%s_%s", drv, mName),
-                               Help:        fmt.Sprintf("Number of %s backend %s", drv, mHelp),
-                               ConstLabels: lbl,
-                       },
-                       func() float64 { return float64(mVal) },
-               ))
-       }
-}
-
 // Tick increments each of the given counters by 1 using
 // atomic.AddUint64.
 func (s *statsTicker) Tick(counters ...*uint64) {
@@ -58,6 +41,7 @@ func (s *statsTicker) TickErr(err error, errType string) {
        if err == nil {
                return
        }
+       s.PromErrors.Inc()
        s.Tick(&s.Errors)
 
        s.lock.Lock()
@@ -66,14 +50,17 @@ func (s *statsTicker) TickErr(err error, errType string) {
        }
        s.ErrorCodes[errType]++
        s.lock.Unlock()
+       s.PromErrorCodes.WithLabelValues(errType).Inc()
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
+       s.PromInBytes.Add(float64(n))
        atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
+       s.PromOutBytes.Add(float64(n))
        atomic.AddUint64(&s.OutBytes, n)
 }
index 9638046391db29ff60b075327f0f9cb274345c91..5c6d1f51aa48af273f7950b39fe06215f6a59c5c 100644 (file)
@@ -14,7 +14,6 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/prometheus/client_golang/prometheus"
 )
 
 type BlockWriter interface {
@@ -40,7 +39,7 @@ type Volume interface {
        // Do whatever private setup tasks and configuration checks
        // are needed. Return non-nil if the volume is unusable (e.g.,
        // invalid config).
-       Start() error
+       Start(m *volumeMetrics) error
 
        // Get a block: copy the block data into buf, and return the
        // number of bytes copied.
@@ -416,9 +415,3 @@ type ioStats struct {
 type InternalStatser interface {
        InternalStats() interface{}
 }
-
-// InternalMetricser provides an interface for volume drivers to register their
-// own specific metrics.
-type InternalMetricser interface {
-       SetupInternalMetrics(*prometheus.Registry, prometheus.Labels)
-}
index 43ddd090cc1cfd22419e80aa86f1e838ffebd479..72666638dec494a3695be402169f190b721eb54e 100644 (file)
@@ -211,7 +211,7 @@ func (v *MockVolume) Type() string {
        return "Mock"
 }
 
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(m *volumeMetrics) error {
        return nil
 }
 
index a80bb7bf4af606248cde60deb989d574d703f389..10cd6398c01e79cf1c99b97da3523ff8dbbd70a8 100644 (file)
@@ -21,8 +21,6 @@ import (
        "sync/atomic"
        "syscall"
        "time"
-
-       "github.com/prometheus/client_golang/prometheus"
 )
 
 type unixVolumeAdder struct {
@@ -120,6 +118,8 @@ type UnixVolume struct {
        locker sync.Locker
 
        os osWithStats
+
+       metrics *volumeMetrics
 }
 
 // DeviceID returns a globally unique ID for the volume's root
@@ -220,7 +220,7 @@ func (v *UnixVolume) Type() string {
 }
 
 // Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(m *volumeMetrics) error {
        if v.Serialize {
                v.locker = &sync.Mutex{}
        }
@@ -231,11 +231,29 @@ func (v *UnixVolume) Start() error {
                v.DirectoryReplication = 1
        }
        _, err := v.os.Stat(v.Root)
+       if err == nil {
+               // Set up prometheus metrics
+               v.metrics = m
+               v.os.stats.PromErrors = v.metrics.Errors
+               v.os.stats.PromErrorCodes = v.metrics.ErrorCodes
+               v.os.stats.PromInBytes = v.metrics.InBytes
+               v.os.stats.PromOutBytes = v.metrics.OutBytes
+               // Periodically update free/used volume space
+               go func() {
+                       for {
+                               v.metrics.BytesFree.Set(float64(v.Status().BytesFree))
+                               v.metrics.BytesUsed.Set(float64(v.Status().BytesUsed))
+                               time.Sleep(10 * time.Second)
+                       }
+               }()
+       }
        return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
+       v.metrics.Ops.Inc()
+       v.metrics.TouchOps.Inc()
        if v.ReadOnly {
                return MethodDisabledError
        }
@@ -301,6 +319,8 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
 func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+       v.metrics.Ops.Inc()
+       v.metrics.GetOps.Inc()
        return getWithPipe(ctx, loc, buf, v)
 }
 
@@ -324,6 +344,8 @@ func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) err
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+       v.metrics.Ops.Inc()
+       v.metrics.CompareOps.Inc()
        path := v.blockPath(loc)
        if _, err := v.stat(path); err != nil {
                return v.translateError(err)
@@ -338,6 +360,8 @@ func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) err
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+       v.metrics.Ops.Inc()
+       v.metrics.PutOps.Inc()
        return putWithPipe(ctx, loc, block, v)
 }
 
@@ -791,42 +815,6 @@ func (v *UnixVolume) EmptyTrash() {
        log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
-// SetupInternalMetrics registers driver stats to Prometheus.
-// Implements InternalMetricser interface.
-func (v *UnixVolume) SetupInternalMetrics(reg *prometheus.Registry, lbl prometheus.Labels) {
-       v.os.stats.setupPrometheus(reg, lbl)
-}
-
-func (s *unixStats) setupPrometheus(reg *prometheus.Registry, lbl prometheus.Labels) {
-       // Common backend metrics
-       s.statsTicker.setupPrometheus("unix", reg, lbl)
-       // Driver-specific backend metrics
-       metrics := map[string][]interface{}{
-               "open_ops":    []interface{}{string("open operations"), s.OpenOps},
-               "stat_ops":    []interface{}{string("stat operations"), s.StatOps},
-               "flock_ops":   []interface{}{string("flock operations"), s.FlockOps},
-               "utimes_ops":  []interface{}{string("utimes operations"), s.UtimesOps},
-               "create_ops":  []interface{}{string("create operations"), s.CreateOps},
-               "rename_ops":  []interface{}{string("rename operations"), s.RenameOps},
-               "unlink_ops":  []interface{}{string("unlink operations"), s.UnlinkOps},
-               "readdir_ops": []interface{}{string("readdir operations"), s.ReaddirOps},
-       }
-       for mName, data := range metrics {
-               mHelp := data[0].(string)
-               mVal := data[1].(uint64)
-               reg.Register(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace:   "arvados",
-                               Subsystem:   "keepstore",
-                               Name:        fmt.Sprintf("unix_%s", mName),
-                               Help:        fmt.Sprintf("Number of unix backend %s", mHelp),
-                               ConstLabels: lbl,
-                       },
-                       func() float64 { return float64(mVal) },
-               ))
-       }
-}
-
 type unixStats struct {
        statsTicker
        OpenOps    uint64
index 7f1cd219644ab241f2c0a8a0e2353c8f4c16844f..05c7a93ae4b94f447188661a4e4eb807b4144eb1 100644 (file)
@@ -20,6 +20,7 @@ import (
        "time"
 
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -115,7 +116,9 @@ func TestReplicationDefault1(t *testing.T) {
                Root:     "/",
                ReadOnly: true,
        }
-       if err := v.Start(); err != nil {
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+               v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+       if err := v.Start(metrics); err != nil {
                t.Error(err)
        }
        if got := v.Replication(); got != 1 {