Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/prometheus/client_golang/prometheus"
)
const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
}
// Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
if v.ContainerName == "" {
return errors.New("no container name given")
}
} else if !ok {
return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
+ v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
return nil
}
}
func (c *azureContainer) Exists() (bool, error) {
+ c.stats.TickOps("exists")
c.stats.Tick(&c.stats.Ops)
ok, err := c.ctr.Exists()
c.stats.TickErr(err)
}
func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+ c.stats.TickOps("get_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetMetadata(nil)
}
func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+ c.stats.TickOps("get_properties")
c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetProperties(nil)
}
func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+ c.stats.TickOps("get")
c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.Get(nil)
}
func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+ c.stats.TickOps("get_range")
c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
}
func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+ c.stats.TickOps("create")
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
if size != 0 {
rdr = &readerWithAzureLen{
}
func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+ c.stats.TickOps("set_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
b := c.ctr.GetBlobReference(bname)
b.Metadata = m
}
func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+ c.stats.TickOps("list")
c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
resp, err := c.ctr.ListBlobs(params)
c.stats.TickErr(err)
}
func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+ c.stats.TickOps("delete")
c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
b := c.ctr.GetBlobReference(bname)
err := b.Delete(opts)
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// Start should be called exactly once: after setting all public
// fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
if cfg.Debug {
log.Level = logrus.DebugLevel
cfg.debugLogf = log.Printf
return fmt.Errorf("no volumes found")
}
}
+ vm := newVolumeMetricsVecs(reg)
for _, v := range cfg.Volumes {
- if err := v.Start(); err != nil {
+ if err := v.Start(vm.opsCounters, vm.errCounters, vm.ioBytes); err != nil {
return fmt.Errorf("volume %s: %s", v, err)
}
log.Printf("Using volume %v (writable=%v)", v, v.Writable())
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
)
var testCluster = &arvados.Cluster{
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
if rt.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+ MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
ok <- struct{}{}
}()
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus"
)
type router struct {
limiter httpserver.RequestCounter
cluster *arvados.Cluster
remoteProxy remoteProxy
+ metrics *nodeMetrics
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
rtr := &router{
Router: mux.NewRouter(),
cluster: cluster,
+ metrics: &nodeMetrics{reg: reg},
}
rtr.HandleFunc(
rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+ rtr.metrics.setupBufferPoolMetrics(bufs)
+ rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+ rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+ rtr.metrics.setupRequestMetrics(rtr.limiter)
- instrumented := httpserver.Instrument(nil, nil,
+ instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
}
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
)
var version = "dev"
log.Printf("keepstore %s started", version)
- err = theConfig.Start()
+ metricsRegistry := prometheus.NewRegistry()
+
+ err = theConfig.Start(metricsRegistry)
if err != nil {
log.Fatal(err)
}
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware/handler stack
- router := MakeRESTRouter(cluster)
+ router := MakeRESTRouter(cluster, metricsRegistry)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type nodeMetrics struct {
+ reg *prometheus.Registry
+}
+
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_bytes_allocated",
+ Help: "Number of bytes allocated to buffers",
+ },
+ func() float64 { return float64(b.Alloc()) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_buffers_max",
+ Help: "Maximum number of buffers allowed",
+ },
+ func() float64 { return float64(b.Cap()) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_buffers_in_use",
+ Help: "Number of buffers in use",
+ },
+ func() float64 { return float64(b.Len()) },
+ ))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: fmt.Sprintf("%s_queue_in_progress", qName),
+ Help: fmt.Sprintf("Number of %s requests in progress", qName),
+ },
+ func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: fmt.Sprintf("%s_queue_queued", qName),
+ Help: fmt.Sprintf("Number of queued %s requests", qName),
+ },
+ func() float64 { return float64(getWorkQueueStatus(q).Queued) },
+ ))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "requests_current",
+ Help: "Number of requests in progress",
+ },
+ func() float64 { return float64(rc.Current()) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "requests_max",
+ Help: "Maximum number of concurrent requests",
+ },
+ func() float64 { return float64(rc.Max()) },
+ ))
+}
+
+type volumeMetricsVecs struct {
+ ioBytes *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ opsCounters *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+ m := &volumeMetricsVecs{}
+ m.opsCounters = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_operations",
+ Help: "Number of volume operations",
+ },
+ []string{"device_id", "operation"},
+ )
+ reg.MustRegister(m.opsCounters)
+ m.errCounters = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_errors",
+ Help: "Number of volume errors",
+ },
+ []string{"device_id", "error_type"},
+ )
+ reg.MustRegister(m.errCounters)
+ m.ioBytes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_bytes",
+ Help: "Volume I/O traffic in bytes",
+ },
+ []string{"device_id", "direction"},
+ )
+ reg.MustRegister(m.ioBytes)
+
+ return m
+}
"net/http/httptest"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.ManagementToken = arvadostest.ManagementToken
- theConfig.Start()
- s.rtr = MakeRESTRouter(testCluster)
+ r := prometheus.NewRegistry()
+ theConfig.Start(r)
+ s.rtr = MakeRESTRouter(testCluster, r)
}
func (s *MountsSuite) TearDownTest(c *check.C) {
s.vm.Close()
KeepVM = nil
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
}
func (s *MountsSuite) TestMounts(c *check.C) {
}
json.NewDecoder(resp.Body).Decode(&j)
found := make(map[string]bool)
+ names := map[string]bool{}
for _, g := range j {
+ names[g.Name] = true
for _, m := range g.Metric {
if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
c.Check(m.Summary.SampleCount, check.Equals, "2")
}
c.Check(found["request_duration_seconds"], check.Equals, true)
c.Check(found["time_to_status_seconds"], check.Equals, true)
+
+ metricsNames := []string{
+ "arvados_keepstore_bufferpool_buffers_in_use",
+ "arvados_keepstore_bufferpool_buffers_max",
+ "arvados_keepstore_bufferpool_bytes_allocated",
+ "arvados_keepstore_pull_queue_in_progress",
+ "arvados_keepstore_pull_queue_queued",
+ "arvados_keepstore_requests_current",
+ "arvados_keepstore_requests_max",
+ "arvados_keepstore_trash_queue_in_progress",
+ "arvados_keepstore_trash_queue_queued",
+ "request_duration_seconds",
+ "time_to_status_seconds",
+ }
+ for _, m := range metricsNames {
+ _, ok := names[m]
+ c.Check(ok, check.Equals, true)
+ }
}
func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
}
}
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
// from buf into the pipe. If ctx is done before all data is copied,
// putWithPipe closes the pipe with an error, and returns early with
// an error.
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.blobSigningKey = []byte(knownKey)
- theConfig.Start()
- s.rtr = MakeRESTRouter(s.cluster)
+ r := prometheus.NewRegistry()
+ theConfig.Start(r)
+ s.rtr = MakeRESTRouter(s.cluster, r)
}
func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
s.vm.Close()
KeepVM = nil
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
s.remoteAPI.Close()
s.remoteKeepproxy.Close()
}
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
. "gopkg.in/check.v1"
)
pullq = nil
teardown()
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
}
var firstPullList = []byte(`[
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
+ "github.com/prometheus/client_golang/prometheus"
)
const (
// Start populates private fields and verifies the configuration is
// valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
region, ok := aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
Name: v.Bucket,
},
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.bucket.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.bucket.stats.errCounters = errCounters.MustCurryWith(lbls)
+ v.bucket.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
return nil
}
}
func (lister *s3Lister) getPage() {
+ lister.Stats.TickOps("list")
lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
lister.nextMarker = ""
func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
rdr, err := b.Bucket.GetReader(path)
+ b.stats.TickOps("get")
b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
b.stats.TickErr(err)
return NewCountingReader(rdr, b.stats.TickInBytes), err
func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
resp, err := b.Bucket.Head(path, headers)
+ b.stats.TickOps("head")
b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
b.stats.TickErr(err)
return resp, err
r = NewCountingReader(r, b.stats.TickOutBytes)
}
err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+ b.stats.TickOps("put")
b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
b.stats.TickErr(err)
return err
func (b *s3bucket) Del(path string) error {
err := b.Bucket.Del(path)
+ b.stats.TickOps("delete")
b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.TickErr(err)
return err
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
vol := *v.S3Volume
vol.Endpoint = srv.URL
v = &TestableS3Volume{S3Volume: &vol}
- v.Start()
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes)
ctx, cancel := context.WithCancel(context.Background())
server: srv,
serverClock: clock,
}
- v.Start()
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes)
err = v.bucket.PutBucket(s3.ACL("private"))
c.Assert(err, check.IsNil)
return v
c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
}
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
defer os.Remove(tmp.Name())
v.S3Volume.AccessKeyFile = tmp.Name()
v.S3Volume.SecretKeyFile = tmp.Name()
- v.c.Assert(v.S3Volume.Start(), check.IsNil)
+ v.c.Assert(v.S3Volume.Start(opsCounters, errCounters, ioBytes), check.IsNil)
return nil
}
import (
"sync"
"sync/atomic"
+
+ "github.com/prometheus/client_golang/prometheus"
)
type statsTicker struct {
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
+
+ opsCounters *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ ioBytes *prometheus.CounterVec
}
// Tick increments each of the given counters by 1 using
}
s.ErrorCodes[errType]++
s.lock.Unlock()
+ if s.errCounters != nil {
+ s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+ }
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
+ if s.ioBytes != nil {
+ s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+ }
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
+ if s.ioBytes != nil {
+ s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+ }
atomic.AddUint64(&s.OutBytes, n)
}
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+ if s.opsCounters == nil {
+ return
+ }
+ for _, opType := range operations {
+ s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+ }
+}
"sync/atomic"
"syscall"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
)
type unixVolumeAdder struct {
}
// String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
return "-"
}
}
// Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
if v.Serialize {
v.locker = &sync.Mutex{}
}
if v.DirectoryReplication == 0 {
v.DirectoryReplication = 1
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.os.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.os.stats.errCounters = errCounters.MustCurryWith(lbls)
+ v.os.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
_, err := v.os.Stat(v.Root)
+
return err
}
}
defer v.unlockfile(f)
ts := syscall.NsecToTimespec(time.Now().UnixNano())
+ v.os.stats.TickOps("utimes")
v.os.stats.Tick(&v.os.stats.UtimesOps)
err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
v.os.stats.TickErr(err)
return putWithPipe(ctx, loc, block, v)
}
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
if v.ReadOnly {
return MethodDisabledError
return err
}
defer rootdir.Close()
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
names, err := rootdir.Readdirnames(1)
lastErr = err
continue
}
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
fileInfo, err := blockdir.Readdir(1)
return MethodDisabledError
}
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(loc))
if err != nil {
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func (v *UnixVolume) lockfile(f *os.File) error {
+ v.os.stats.TickOps("flock")
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
v.os.stats.TickErr(err)
}
func (o *osWithStats) Open(name string) (*os.File, error) {
+ o.stats.TickOps("open")
o.stats.Tick(&o.stats.OpenOps)
f, err := os.Open(name)
o.stats.TickErr(err)
}
func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+ o.stats.TickOps("open")
o.stats.Tick(&o.stats.OpenOps)
f, err := os.OpenFile(name, flag, perm)
o.stats.TickErr(err)
}
func (o *osWithStats) Remove(path string) error {
+ o.stats.TickOps("unlink")
o.stats.Tick(&o.stats.UnlinkOps)
err := os.Remove(path)
o.stats.TickErr(err)
}
func (o *osWithStats) Rename(a, b string) error {
+ o.stats.TickOps("rename")
o.stats.Tick(&o.stats.RenameOps)
err := os.Rename(a, b)
o.stats.TickErr(err)
}
func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+ o.stats.TickOps("stat")
o.stats.Tick(&o.stats.StatOps)
fi, err := os.Stat(path)
o.stats.TickErr(err)
}
func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+ o.stats.TickOps("create")
o.stats.Tick(&o.stats.CreateOps)
f, err := ioutil.TempFile(dir, base)
o.stats.TickErr(err)
"time"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
Root: "/",
ReadOnly: true,
}
- if err := v.Start(); err != nil {
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ if err := v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes); err != nil {
t.Error(err)
}
if got := v.Replication(); got != 1 {
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
)
type BlockWriter interface {
// Do whatever private setup tasks and configuration checks
// are needed. Return non-nil if the volume is unusable (e.g.,
// invalid config).
- Start() error
+ Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
"strings"
"sync"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
)
// A TestableVolume allows test suites to manipulate the state of an
return "Mock"
}
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
return nil
}