Merge branch '13937-keepstore-prometheus'
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Tue, 12 Mar 2019 20:32:43 +0000 (17:32 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Tue, 12 Mar 2019 20:32:43 +0000 (17:32 -0300)
Closes #13937

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

19 files changed:
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/config.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/metrics.go [new file with mode: 0644]
services/keepstore/mounts_test.go
services/keepstore/pipe_adapters.go
services/keepstore/proxy_remote_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/stats_ticker.go
services/keepstore/unix_volume.go [moved from services/keepstore/volume_unix.go with 96% similarity]
services/keepstore/unix_volume_test.go [moved from services/keepstore/volume_unix_test.go with 97% similarity]
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go

index 4f7339facf4ace001ac886a5076afc217e040c18..6b5b233c2a6701912ce06b1356fdb864778d0cf8 100644 (file)
@@ -23,6 +23,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@ -147,7 +148,7 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
@@ -183,6 +184,10 @@ func (v *AzureBlobVolume) Start() error {
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -727,6 +732,7 @@ type azureContainer struct {
 }
 
 func (c *azureContainer) Exists() (bool, error) {
+       c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
@@ -734,6 +740,7 @@ func (c *azureContainer) Exists() (bool, error) {
 }
 
 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+       c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
@@ -742,6 +749,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er
 }
 
 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+       c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
@@ -750,6 +758,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie
 }
 
 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+       c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
@@ -758,6 +767,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 }
 
 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+       c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@ -785,6 +795,7 @@ func (r *readerWithAzureLen) Len() int {
 }
 
 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+       c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
@@ -799,6 +810,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i
 }
 
 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+       c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
@@ -808,6 +820,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o
 }
 
 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
@@ -815,6 +828,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.
 }
 
 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+       c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)
index 85d0a1eea4ee7136668debc0fcbbdcd86aed30a5..cfad7577c59d850d25e9f2281a4ad374a60295af 100644 (file)
@@ -29,6 +29,7 @@ import (
 
        "github.com/Azure/azure-sdk-for-go/storage"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -745,6 +746,21 @@ func (v *TestableAzureBlobVolume) Teardown() {
        v.azStub.Close()
 }
 
+func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "create"
+}
+
+func (v *TestableAzureBlobVolume) DeviceID() string {
+       // Dummy device id for testing purposes
+       return "azure://azure_blob_volume_test"
+}
+
+func (v *TestableAzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+       // Override original Start() to be able to assign CounterVecs with a dummy DeviceID
+       v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+       return nil
+}
+
 func makeEtag() string {
        return fmt.Sprintf("0x%x", rand.Int63())
 }
index d00bf0125fea25441955bf0e42aada62819d67ff..43a2191111376fbf86c6943ffffff6c22668aa38 100644 (file)
@@ -13,6 +13,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
@@ -81,7 +82,7 @@ func DefaultConfig() *Config {
 
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
        if cfg.Debug {
                log.Level = logrus.DebugLevel
                cfg.debugLogf = log.Printf
@@ -143,8 +144,9 @@ func (cfg *Config) Start() error {
                        return fmt.Errorf("no volumes found")
                }
        }
+       vm := newVolumeMetricsVecs(reg)
        for _, v := range cfg.Volumes {
-               if err := v.Start(); err != nil {
+               if err := v.Start(vm); err != nil {
                        return fmt.Errorf("volume %s: %s", v, err)
                }
                log.Printf("Using volume %v (writable=%v)", v, v.Writable())
index 32b360b1276940c9da69bc4b44b02785ffefc97f..ad907ef10138f213e3831223d867fd3c114736d9 100644 (file)
@@ -28,6 +28,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var testCluster = &arvados.Cluster{
@@ -845,7 +846,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -857,7 +858,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -997,7 +998,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 2a1bbc972ffa6e4fe0675291b0c923efc4d4ac8d..51dd73a513c1d4c729a6743aaabe0cefa1202c4b 100644 (file)
@@ -24,6 +24,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/gorilla/mux"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type router struct {
@@ -31,14 +32,16 @@ type router struct {
        limiter     httpserver.RequestCounter
        cluster     *arvados.Cluster
        remoteProxy remoteProxy
+       metrics     *nodeMetrics
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
        rtr := &router{
                Router:  mux.NewRouter(),
                cluster: cluster,
+               metrics: &nodeMetrics{reg: reg},
        }
 
        rtr.HandleFunc(
@@ -85,8 +88,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
        rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
        rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+       rtr.metrics.setupBufferPoolMetrics(bufs)
+       rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+       rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+       rtr.metrics.setupRequestMetrics(rtr.limiter)
 
-       instrumented := httpserver.Instrument(nil, nil,
+       instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
                httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
        return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
 }
index a6c8cd99545c24fdc2a56f6c2ff1866682a6ed6d..fcbdddacb1d585e995c8f23a0528be2ce8c1723c 100644 (file)
@@ -18,6 +18,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var version = "dev"
@@ -121,7 +122,9 @@ func main() {
 
        log.Printf("keepstore %s started", version)
 
-       err = theConfig.Start()
+       metricsRegistry := prometheus.NewRegistry()
+
+       err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
@@ -174,7 +177,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter(cluster)
+       router := MakeRESTRouter(cluster, metricsRegistry)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
new file mode 100644 (file)
index 0000000..235c418
--- /dev/null
@@ -0,0 +1,137 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type nodeMetrics struct {
+       reg *prometheus.Registry
+}
+
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_allocated_bytes",
+                       Help:      "Number of bytes allocated to buffers",
+               },
+               func() float64 { return float64(b.Alloc()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_max_buffers",
+                       Help:      "Maximum number of buffers allowed",
+               },
+               func() float64 { return float64(b.Cap()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_inuse_buffers",
+                       Help:      "Number of buffers in use",
+               },
+               func() float64 { return float64(b.Len()) },
+       ))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      fmt.Sprintf("%s_queue_inprogress_entries", qName),
+                       Help:      fmt.Sprintf("Number of %s requests in progress", qName),
+               },
+               func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      fmt.Sprintf("%s_queue_pending_entries", qName),
+                       Help:      fmt.Sprintf("Number of queued %s requests", qName),
+               },
+               func() float64 { return float64(getWorkQueueStatus(q).Queued) },
+       ))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "concurrent_requests",
+                       Help:      "Number of requests in progress",
+               },
+               func() float64 { return float64(rc.Current()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "max_concurrent_requests",
+                       Help:      "Maximum number of concurrent requests",
+               },
+               func() float64 { return float64(rc.Max()) },
+       ))
+}
+
+type volumeMetricsVecs struct {
+       ioBytes     *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       opsCounters *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+       m := &volumeMetricsVecs{}
+       m.opsCounters = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_operations",
+                       Help:      "Number of volume operations",
+               },
+               []string{"device_id", "operation"},
+       )
+       reg.MustRegister(m.opsCounters)
+       m.errCounters = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_errors",
+                       Help:      "Number of volume errors",
+               },
+               []string{"device_id", "error_type"},
+       )
+       reg.MustRegister(m.errCounters)
+       m.ioBytes = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_bytes",
+                       Help:      "Volume I/O traffic in bytes",
+               },
+               []string{"device_id", "direction"},
+       )
+       reg.MustRegister(m.ioBytes)
+
+       return m
+}
+
+func (vm *volumeMetricsVecs) getCounterVecsFor(lbls prometheus.Labels) (opsCV, errCV, ioCV *prometheus.CounterVec) {
+       opsCV = vm.opsCounters.MustCurryWith(lbls)
+       errCV = vm.errCounters.MustCurryWith(lbls)
+       ioCV = vm.ioBytes.MustCurryWith(lbls)
+       return
+}
index 31b1a684fe6a077ebbbfebf7bb846f6f508a00b5..7c932ee023b2a188433e34bbf773cc0eb8b64b08 100644 (file)
@@ -12,6 +12,7 @@ import (
        "net/http/httptest"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -28,15 +29,16 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.ManagementToken = arvadostest.ManagementToken
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(testCluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(testCluster, r)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 func (s *MountsSuite) TestMounts(c *check.C) {
@@ -131,7 +133,9 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        }
        json.NewDecoder(resp.Body).Decode(&j)
        found := make(map[string]bool)
+       names := map[string]bool{}
        for _, g := range j {
+               names[g.Name] = true
                for _, m := range g.Metric {
                        if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
                                c.Check(m.Summary.SampleCount, check.Equals, "2")
@@ -143,6 +147,24 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        }
        c.Check(found["request_duration_seconds"], check.Equals, true)
        c.Check(found["time_to_status_seconds"], check.Equals, true)
+
+       metricsNames := []string{
+               "arvados_keepstore_bufferpool_inuse_buffers",
+               "arvados_keepstore_bufferpool_max_buffers",
+               "arvados_keepstore_bufferpool_allocated_bytes",
+               "arvados_keepstore_pull_queue_inprogress_entries",
+               "arvados_keepstore_pull_queue_pending_entries",
+               "arvados_keepstore_concurrent_requests",
+               "arvados_keepstore_max_concurrent_requests",
+               "arvados_keepstore_trash_queue_inprogress_entries",
+               "arvados_keepstore_trash_queue_pending_entries",
+               "request_duration_seconds",
+               "time_to_status_seconds",
+       }
+       for _, m := range metricsNames {
+               _, ok := names[m]
+               c.Check(ok, check.Equals, true)
+       }
 }
 
 func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
index e4a5865a43dd10d20215eb9cebd13bd6fe2271b9..69ed6d2ff5f1f8d80bd6c6e6ebe7d75f7e4ff259 100644 (file)
@@ -39,7 +39,7 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (i
        }
 }
 
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
 // from buf into the pipe. If ctx is done before all data is copied,
 // putWithPipe closes the pipe with an error, and returns early with
 // an error.
index 6e720b8499f366c5d931729047de7a3b1b632faf..6c22d1d32aa2f0745a2cc424cdfeef4d4d76ca75 100644 (file)
@@ -20,6 +20,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -100,15 +101,16 @@ func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.blobSigningKey = []byte(knownKey)
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(s.cluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(s.cluster, r)
 }
 
 func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
        s.remoteAPI.Close()
        s.remoteKeepproxy.Close()
 }
index 7b5077c1a7f70dad794e14fc148ae1da7fc60d04..8e667e048f47ff3f3ac91df65c960ac94511e8b3 100644 (file)
@@ -14,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        . "gopkg.in/check.v1"
 )
 
@@ -58,7 +59,7 @@ func (s *PullWorkerTestSuite) TearDownTest(c *C) {
        pullq = nil
        teardown()
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 var firstPullList = []byte(`[
index fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c..4c39dcd5c4f12fc9a8b8ad36d165545af952fb7a 100644 (file)
@@ -25,6 +25,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const (
@@ -198,7 +199,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
        region, ok := aws.Regions[v.Region]
        if v.Endpoint == "" {
                if !ok {
@@ -248,6 +249,10 @@ func (v *S3Volume) Start() error {
                        Name: v.Bucket,
                },
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -929,6 +934,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.TickOps("list")
        lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
@@ -965,6 +971,7 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
+       b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
        return NewCountingReader(rdr, b.stats.TickInBytes), err
@@ -972,6 +979,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
+       b.stats.TickOps("head")
        b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.TickErr(err)
        return resp, err
@@ -990,6 +998,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                r = NewCountingReader(r, b.stats.TickOutBytes)
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+       b.stats.TickOps("put")
        b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.TickErr(err)
        return err
@@ -997,6 +1006,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
+       b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        return err
index 10c71125df39acb3feadc4e69e4d2190d53a10fe..6377420ff4b381cba49b07d2813fb4803f03aa62 100644 (file)
@@ -20,6 +20,7 @@ import (
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -170,7 +171,8 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
        vol := *v.S3Volume
        vol.Endpoint = srv.URL
        v = &TestableS3Volume{S3Volume: &vol}
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       v.Start(metrics)
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -430,7 +432,8 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
                server:      srv,
                serverClock: clock,
        }
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       v.Start(metrics)
        err = v.bucket.PutBucket(s3.ACL("private"))
        c.Assert(err, check.IsNil)
        return v
@@ -448,7 +451,7 @@ Volumes:
        c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
 }
 
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(vm *volumeMetricsVecs) error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
        defer os.Remove(tmp.Name())
@@ -459,7 +462,7 @@ func (v *TestableS3Volume) Start() error {
        v.S3Volume.AccessKeyFile = tmp.Name()
        v.S3Volume.SecretKeyFile = tmp.Name()
 
-       v.c.Assert(v.S3Volume.Start(), check.IsNil)
+       v.c.Assert(v.S3Volume.Start(vm), check.IsNil)
        return nil
 }
 
@@ -490,3 +493,7 @@ func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
 func (v *TestableS3Volume) Teardown() {
        v.server.Quit()
 }
+
+func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "put"
+}
index 377a53675783b890fa7863dd98ea50681074697b..342b9e32058e23a1f09fc305d8fdc37caf104198 100644 (file)
@@ -7,6 +7,8 @@ package main
 import (
        "sync"
        "sync/atomic"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type statsTicker struct {
@@ -16,6 +18,10 @@ type statsTicker struct {
 
        ErrorCodes map[string]uint64 `json:",omitempty"`
        lock       sync.Mutex
+
+       opsCounters *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       ioBytes     *prometheus.CounterVec
 }
 
 // Tick increments each of the given counters by 1 using
@@ -41,14 +47,33 @@ func (s *statsTicker) TickErr(err error, errType string) {
        }
        s.ErrorCodes[errType]++
        s.lock.Unlock()
+       if s.errCounters != nil {
+               s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+       }
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.OutBytes, n)
 }
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+       if s.opsCounters == nil {
+               return
+       }
+       for _, opType := range operations {
+               s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+       }
+}
similarity index 96%
rename from services/keepstore/volume_unix.go
rename to services/keepstore/unix_volume.go
index 23d675359244942097072d88e1bd98daf9d46c6c..96f458720d38b56b97fa51fd63e76faa798987bf 100644 (file)
@@ -21,6 +21,8 @@ import (
        "sync/atomic"
        "syscall"
        "time"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type unixVolumeAdder struct {
@@ -28,7 +30,7 @@ type unixVolumeAdder struct {
 }
 
 // String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
        return "-"
 }
 
@@ -218,7 +220,7 @@ func (v *UnixVolume) Type() string {
 }
 
 // Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(vm *volumeMetricsVecs) error {
        if v.Serialize {
                v.locker = &sync.Mutex{}
        }
@@ -228,7 +230,12 @@ func (v *UnixVolume) Start() error {
        if v.DirectoryReplication == 0 {
                v.DirectoryReplication = 1
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        _, err := v.os.Stat(v.Root)
+
        return err
 }
 
@@ -252,6 +259,7 @@ func (v *UnixVolume) Touch(loc string) error {
        }
        defer v.unlockfile(f)
        ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       v.os.stats.TickOps("utimes")
        v.os.stats.Tick(&v.os.stats.UtimesOps)
        err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
        v.os.stats.TickErr(err)
@@ -339,7 +347,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
        return putWithPipe(ctx, loc, block, v)
 }
 
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
 func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
        if v.ReadOnly {
                return MethodDisabledError
@@ -439,6 +447,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                return err
        }
        defer rootdir.Close()
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        for {
                names, err := rootdir.Readdirnames(1)
@@ -461,6 +470,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        lastErr = err
                        continue
                }
+               v.os.stats.TickOps("readdir")
                v.os.stats.Tick(&v.os.stats.ReaddirOps)
                for {
                        fileInfo, err := blockdir.Readdir(1)
@@ -549,6 +559,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return MethodDisabledError
        }
 
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(loc))
        if err != nil {
@@ -695,6 +706,7 @@ func (v *UnixVolume) unlock() {
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func (v *UnixVolume) lockfile(f *os.File) error {
+       v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
        v.os.stats.TickErr(err)
@@ -813,6 +825,7 @@ type osWithStats struct {
 }
 
 func (o *osWithStats) Open(name string) (*os.File, error) {
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.Open(name)
        o.stats.TickErr(err)
@@ -820,6 +833,7 @@ func (o *osWithStats) Open(name string) (*os.File, error) {
 }
 
 func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.OpenFile(name, flag, perm)
        o.stats.TickErr(err)
@@ -827,6 +841,7 @@ func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.Fil
 }
 
 func (o *osWithStats) Remove(path string) error {
+       o.stats.TickOps("unlink")
        o.stats.Tick(&o.stats.UnlinkOps)
        err := os.Remove(path)
        o.stats.TickErr(err)
@@ -834,6 +849,7 @@ func (o *osWithStats) Remove(path string) error {
 }
 
 func (o *osWithStats) Rename(a, b string) error {
+       o.stats.TickOps("rename")
        o.stats.Tick(&o.stats.RenameOps)
        err := os.Rename(a, b)
        o.stats.TickErr(err)
@@ -841,6 +857,7 @@ func (o *osWithStats) Rename(a, b string) error {
 }
 
 func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+       o.stats.TickOps("stat")
        o.stats.Tick(&o.stats.StatOps)
        fi, err := os.Stat(path)
        o.stats.TickErr(err)
@@ -848,6 +865,7 @@ func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
 }
 
 func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+       o.stats.TickOps("create")
        o.stats.Tick(&o.stats.CreateOps)
        f, err := ioutil.TempFile(dir, base)
        o.stats.TickErr(err)
similarity index 97%
rename from services/keepstore/volume_unix_test.go
rename to services/keepstore/unix_volume_test.go
index 7f1cd219644ab241f2c0a8a0e2353c8f4c16844f..872f408cf8cd68571705d240cb6b6184fce70a1d 100644 (file)
@@ -20,6 +20,7 @@ import (
        "time"
 
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -73,6 +74,10 @@ func (v *TestableUnixVolume) Teardown() {
        }
 }
 
+func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "open", "create"
+}
+
 // serialize = false; readonly = false
 func TestUnixVolumeWithGenericTests(t *testing.T) {
        DoGenericVolumeTests(t, func(t TB) TestableVolume {
@@ -115,7 +120,8 @@ func TestReplicationDefault1(t *testing.T) {
                Root:     "/",
                ReadOnly: true,
        }
-       if err := v.Start(); err != nil {
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       if err := v.Start(metrics); err != nil {
                t.Error(err)
        }
        if got := v.Replication(); got != 1 {
index 6bce05bec033fbda6c759b6b4266bcbff0f3e051..52b9b1b244c0a7032c66a2ea12b8d867c2384940 100644 (file)
@@ -39,7 +39,7 @@ type Volume interface {
        // Do whatever private setup tasks and configuration checks
        // are needed. Return non-nil if the volume is unusable (e.g.,
        // invalid config).
-       Start() error
+       Start(vm *volumeMetricsVecs) error
 
        // Get a block: copy the block data into buf, and return the
        // number of bytes copied.
index 23a17fd0998ebff4b8b3781dcf590534e88bc8a0..d5a413693f6c46c1d8241838a1ea87581191f9a4 100644 (file)
@@ -18,6 +18,8 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
+       dto "github.com/prometheus/client_model/go"
 )
 
 type TB interface {
@@ -75,6 +77,8 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
 
        testStatus(t, factory)
 
+       testMetrics(t, factory)
+
        testString(t, factory)
 
        testUpdateReadOnly(t, factory)
@@ -533,6 +537,84 @@ func testStatus(t TB, factory TestableVolumeFactory) {
        }
 }
 
+func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
+       c, _ := cv.GetMetricWith(lbls)
+       pb := &dto.Metric{}
+       c.Write(pb)
+       return pb.GetCounter().GetValue()
+}
+
+func testMetrics(t TB, factory TestableVolumeFactory) {
+       var err error
+
+       v := factory(t)
+       defer v.Teardown()
+       reg := prometheus.NewRegistry()
+       vm := newVolumeMetricsVecs(reg)
+
+       err = v.Start(vm)
+       if err != nil {
+               t.Error("Failed Start(): ", err)
+       }
+       opsC, _, ioC := vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+
+       if ioC == nil {
+               t.Error("ioBytes CounterVec is nil")
+               return
+       }
+
+       if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
+               getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
+               t.Error("ioBytes counter should be zero")
+       }
+
+       if opsC == nil {
+               t.Error("opsCounter CounterVec is nil")
+               return
+       }
+
+       var c, writeOpCounter, readOpCounter float64
+
+       readOpType, writeOpType := v.ReadWriteOperationLabelValues()
+       writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
+       readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
+
+       // Test Put if volume is writable
+       if v.Writable() {
+               err = v.Put(context.Background(), TestHash, TestBlock)
+               if err != nil {
+                       t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+               }
+               // Check that the write operations counter increased
+               c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
+               if c <= writeOpCounter {
+                       t.Error("Operation(s) not counted on Put")
+               }
+               // Check that bytes counter is > 0
+               if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
+                       t.Error("ioBytes{direction=out} counter shouldn't be zero")
+               }
+       } else {
+               v.PutRaw(TestHash, TestBlock)
+       }
+
+       buf := make([]byte, BlockSize)
+       _, err = v.Get(context.Background(), TestHash, buf)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       // Check that the operations counter increased
+       c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
+       if c <= readOpCounter {
+               t.Error("Operation(s) not counted on Get")
+       }
+       // Check that the bytes "in" counter is > 0
+       if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
+               t.Error("ioBytes{direction=in} counter shouldn't be zero")
+       }
+}
+
 // Invoke String for the volume; expect non-empty result
 // Test should pass for both writable and read-only volumes
 func testString(t TB, factory TestableVolumeFactory) {
index 046f3fac2e0c8c27081c22fea69a0aae7f02acda..0b8af330fb2d86f771926f07f5f38a34cf09b8ef 100644 (file)
@@ -22,10 +22,14 @@ import (
 // impractical to achieve with a sequence of normal Volume operations.
 type TestableVolume interface {
        Volume
+
        // [Over]write content for a locator with the given data,
        // bypassing all constraints like readonly and serialize.
        PutRaw(locator string, data []byte)
 
+       // Returns the strings that a driver uses to record read/write operations.
+       ReadWriteOperationLabelValues() (r, w string)
+
        // Specify the value Mtime() should return, until the next
        // call to Touch, TouchWithDate, or Put.
        TouchWithDate(locator string, lastPut time.Time)
@@ -212,7 +216,7 @@ func (v *MockVolume) Type() string {
        return "Mock"
 }
 
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(vm *volumeMetricsVecs) error {
        return nil
 }