"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/prometheus/client_golang/prometheus"
)
const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
}
// Start implements Volume.
-func (v *AzureBlobVolume) Start(m *volumeMetrics) error {
+func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
if v.ContainerName == "" {
return errors.New("no container name given")
}
}
vm := newVolumeMetricsVecs(reg)
for _, v := range cfg.Volumes {
- metrics := vm.curryWith(
- v.String(),
- v.Status().MountPoint,
- fmt.Sprintf("%d", v.Status().DeviceNum))
- if err := v.Start(metrics); err != nil {
+ if err := v.Start(vm.opsCounters, vm.errCounters, vm.ioBytes); err != nil {
return fmt.Errorf("volume %s: %s", v, err)
}
log.Printf("Using volume %v (writable=%v)", v, v.Writable())
}
type volumeMetricsVecs struct {
- reg *prometheus.Registry
- BytesFree *prometheus.GaugeVec
- BytesUsed *prometheus.GaugeVec
- Errors *prometheus.CounterVec
- Ops *prometheus.CounterVec
- CompareOps *prometheus.CounterVec
- GetOps *prometheus.CounterVec
- PutOps *prometheus.CounterVec
- TouchOps *prometheus.CounterVec
- InBytes *prometheus.CounterVec
- OutBytes *prometheus.CounterVec
- ErrorCodes *prometheus.CounterVec
-}
-
-type volumeMetrics struct {
- reg *prometheus.Registry
- lbls []string
- internalCounters map[string]*prometheus.CounterVec
- BytesFree prometheus.Gauge
- BytesUsed prometheus.Gauge
- Errors prometheus.Counter
- Ops prometheus.Counter
- CompareOps prometheus.Counter
- GetOps prometheus.Counter
- PutOps prometheus.Counter
- TouchOps prometheus.Counter
- InBytes prometheus.Counter
- OutBytes prometheus.Counter
- ErrorCodes *prometheus.CounterVec
+ ioBytes *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ opsCounters *prometheus.CounterVec
}
func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
- m := &volumeMetricsVecs{
- reg: reg,
- }
- m.BytesFree = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_bytes_free",
- Help: "Number of free bytes on the volume",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.BytesFree)
- m.BytesUsed = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_bytes_used",
- Help: "Number of used bytes on the volume",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.BytesUsed)
- m.Errors = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_errors",
- Help: "Number of volume I/O errors",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.Errors)
- m.Ops = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_ops",
- Help: "Number of volume I/O operations",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.Ops)
- m.CompareOps = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_compare_ops",
- Help: "Number of volume I/O compare operations",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.CompareOps)
- m.GetOps = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_get_ops",
- Help: "Number of volume I/O get operations",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.GetOps)
- m.PutOps = prometheus.NewCounterVec(
+ m := &volumeMetricsVecs{}
+ m.opsCounters = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "volume_io_put_ops",
- Help: "Number of volume I/O put operations",
+ Name: "volume_operations",
+ Help: "Number of volume operations",
},
- []string{"label", "mount_point", "device_number"},
+ []string{"device_id", "operation"},
)
- reg.MustRegister(m.PutOps)
- m.TouchOps = prometheus.NewCounterVec(
+ reg.MustRegister(m.opsCounters)
+ m.errCounters = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "volume_io_touch_ops",
- Help: "Number of volume I/O touch operations",
+ Name: "volume_errors",
+ Help: "Number of volume errors",
},
- []string{"label", "mount_point", "device_number"},
+ []string{"device_id", "error_type"},
)
- reg.MustRegister(m.TouchOps)
- m.InBytes = prometheus.NewCounterVec(
+ reg.MustRegister(m.errCounters)
+ m.ioBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "volume_io_in_bytes",
- Help: "Number of input bytes",
+ Name: "volume_io_bytes",
+ Help: "Volume I/O traffic in bytes",
},
- []string{"label", "mount_point", "device_number"},
+ []string{"device_id", "direction"},
)
- reg.MustRegister(m.InBytes)
- m.OutBytes = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_out_bytes",
- Help: "Number of output bytes",
- },
- []string{"label", "mount_point", "device_number"},
- )
- reg.MustRegister(m.OutBytes)
- m.ErrorCodes = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_error_codes",
- Help: "Number of I/O errors by error code",
- },
- []string{"label", "mount_point", "device_number", "error_code"},
- )
- reg.MustRegister(m.ErrorCodes)
+ reg.MustRegister(m.ioBytes)
return m
}
-
-func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
- lbls := []string{lbl, mnt, dev}
- curried := &volumeMetrics{
- reg: m.reg,
- lbls: lbls,
- internalCounters: make(map[string]*prometheus.CounterVec),
- BytesFree: m.BytesFree.WithLabelValues(lbls...),
- BytesUsed: m.BytesUsed.WithLabelValues(lbls...),
- Errors: m.Errors.WithLabelValues(lbls...),
- Ops: m.Ops.WithLabelValues(lbls...),
- CompareOps: m.CompareOps.WithLabelValues(lbls...),
- GetOps: m.GetOps.WithLabelValues(lbls...),
- PutOps: m.PutOps.WithLabelValues(lbls...),
- TouchOps: m.TouchOps.WithLabelValues(lbls...),
- InBytes: m.InBytes.WithLabelValues(lbls...),
- OutBytes: m.OutBytes.WithLabelValues(lbls...),
- ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
- "label": lbl,
- "mount_point": mnt,
- "device_number": dev,
- }),
- }
- return curried
-}
-
-// Returns a driver specific counter, creating it when needed. The 'name' argument
-// should include the driver prefix.
-func (m *volumeMetrics) getInternalCounter(name string, help string) prometheus.Counter {
- counterVec, ok := m.internalCounters[name]
- if !ok {
- counterVec = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: name,
- Help: help,
- },
- []string{"label", "mount_point", "device_number"},
- )
- m.reg.MustRegister(counterVec)
- m.internalCounters[name] = counterVec
- }
- return counterVec.WithLabelValues(m.lbls...)
-}
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
+ "github.com/prometheus/client_golang/prometheus"
)
const (
// Start populates private fields and verifies the configuration is
// valid.
-func (v *S3Volume) Start(m *volumeMetrics) error {
+func (v *S3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
region, ok := aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
vol := *v.S3Volume
vol.Endpoint = srv.URL
v = &TestableS3Volume{S3Volume: &vol}
- metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
- v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
- v.Start(metrics)
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes)
ctx, cancel := context.WithCancel(context.Background())
server: srv,
serverClock: clock,
}
- metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
- v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
- v.Start(metrics)
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes)
err = v.bucket.PutBucket(s3.ACL("private"))
c.Assert(err, check.IsNil)
return v
c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
}
-func (v *TestableS3Volume) Start(m *volumeMetrics) error {
+func (v *TestableS3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
defer os.Remove(tmp.Name())
v.S3Volume.AccessKeyFile = tmp.Name()
v.S3Volume.SecretKeyFile = tmp.Name()
- v.c.Assert(v.S3Volume.Start(m), check.IsNil)
+ v.c.Assert(v.S3Volume.Start(opsCounters, errCounters, ioBytes), check.IsNil)
return nil
}
import (
"sync"
"sync/atomic"
-
- "github.com/prometheus/client_golang/prometheus"
)
type statsTicker struct {
InBytes uint64
OutBytes uint64
- // Prometheus metrics
- errors prometheus.Counter
- inBytes prometheus.Counter
- outBytes prometheus.Counter
- errCounters *prometheus.CounterVec
-
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
}
-func (s *statsTicker) setup(m *volumeMetrics) {
- s.errors = m.Errors
- s.errCounters = m.ErrorCodes
- s.inBytes = m.InBytes
- s.outBytes = m.OutBytes
-}
-
// Tick increments each of the given counters by 1 using
// atomic.AddUint64.
func (s *statsTicker) Tick(counters ...*uint64) {
if err == nil {
return
}
- if s.errors != nil {
- s.errors.Inc()
- }
s.Tick(&s.Errors)
s.lock.Lock()
}
s.ErrorCodes[errType]++
s.lock.Unlock()
- if s.errCounters != nil {
- s.errCounters.WithLabelValues(errType).Inc()
- }
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
- if s.inBytes != nil {
- s.inBytes.Add(float64(n))
- }
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
- if s.outBytes != nil {
- s.outBytes.Add(float64(n))
- }
atomic.AddUint64(&s.OutBytes, n)
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
)
type BlockWriter interface {
// Do whatever private setup tasks and configuration checks
// are needed. Return non-nil if the volume is unusable (e.g.,
// invalid config).
- Start(m *volumeMetrics) error
+ Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
"strings"
"sync"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
)
// A TestableVolume allows test suites to manipulate the state of an
return "Mock"
}
-func (v *MockVolume) Start(m *volumeMetrics) error {
+func (v *MockVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
return nil
}
os osWithStats
- metrics *volumeMetrics
+ // Volume metrics
+ opsCounters *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ ioBytes *prometheus.CounterVec
}
// DeviceID returns a globally unique ID for the volume's root
}
// Start implements Volume
-func (v *UnixVolume) Start(m *volumeMetrics) error {
+func (v *UnixVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
if v.Serialize {
v.locker = &sync.Mutex{}
}
_, err := v.os.Stat(v.Root)
if err == nil {
// Set up prometheus metrics
- v.metrics = m
- v.os.stats.setup(v.metrics)
- // Periodically update free/used volume space
- go func() {
- for {
- v.metrics.BytesFree.Set(float64(v.Status().BytesFree))
- v.metrics.BytesUsed.Set(float64(v.Status().BytesUsed))
- time.Sleep(10 * time.Second)
- }
- }()
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.errCounters = errCounters.MustCurryWith(lbls)
+ v.ioBytes = ioBytes.MustCurryWith(lbls)
+ v.os.promSetup(v.opsCounters, v.errCounters, v.ioBytes)
}
return err
}
// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
- v.metrics.Ops.Inc()
- v.metrics.TouchOps.Inc()
if v.ReadOnly {
return MethodDisabledError
}
}
defer v.unlockfile(f)
ts := syscall.NsecToTimespec(time.Now().UnixNano())
- v.os.stats.utimesOps.Inc()
+ v.os.opsCounters.With(prometheus.Labels{"operation": "utimes"}).Inc()
v.os.stats.Tick(&v.os.stats.UtimesOps)
err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
v.os.stats.TickErr(err)
return err
}
defer f.Close()
- return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
+ return fn(NewCountingReader(
+ ioutil.NopCloser(f),
+ func(c uint64) {
+ v.os.stats.TickInBytes(c)
+ v.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(c))
+ }))
}
// stat is os.Stat() with some extra sanity checks.
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
- v.metrics.Ops.Inc()
- v.metrics.GetOps.Inc()
return getWithPipe(ctx, loc, buf, v)
}
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
- v.metrics.Ops.Inc()
- v.metrics.CompareOps.Inc()
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
- v.metrics.Ops.Inc()
- v.metrics.PutOps.Inc()
return putWithPipe(ctx, loc, block, v)
}
}
defer v.unlock()
n, err := io.Copy(tmpfile, rdr)
+ v.os.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
v.os.stats.TickOutBytes(uint64(n))
if err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
return err
}
defer rootdir.Close()
- v.os.stats.readdirOps.Inc()
+ v.os.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
names, err := rootdir.Readdirnames(1)
lastErr = err
continue
}
- v.os.stats.readdirOps.Inc()
+ v.os.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
fileInfo, err := blockdir.Readdir(1)
return MethodDisabledError
}
- v.os.stats.readdirOps.Inc()
+ v.os.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(loc))
if err != nil {
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func (v *UnixVolume) lockfile(f *os.File) error {
- v.os.stats.flockOps.Inc()
+ v.os.opsCounters.With(prometheus.Labels{"operation": "flock"}).Inc()
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
v.os.stats.TickErr(err)
RenameOps uint64
UnlinkOps uint64
ReaddirOps uint64
- // Prometheus metrics -- Above ad-hoc counters will be eventually removed
- openOps prometheus.Counter
- statOps prometheus.Counter
- flockOps prometheus.Counter
- utimesOps prometheus.Counter
- createOps prometheus.Counter
- renameOps prometheus.Counter
- unlinkOps prometheus.Counter
- readdirOps prometheus.Counter
-}
-
-func (s *unixStats) setup(m *volumeMetrics) {
- s.statsTicker.setup(m)
- s.openOps = m.getInternalCounter("unix_open_ops", "Number of backend open operations")
- s.statOps = m.getInternalCounter("unix_stat_ops", "Number of backend stat operations")
- s.flockOps = m.getInternalCounter("unix_flock_ops", "Number of backend flock operations")
- s.utimesOps = m.getInternalCounter("unix_utimes_ops", "Number of backend utimes operations")
- s.createOps = m.getInternalCounter("unix_create_ops", "Number of backend create operations")
- s.renameOps = m.getInternalCounter("unix_rename_ops", "Number of backend rename operations")
- s.unlinkOps = m.getInternalCounter("unix_unlink_ops", "Number of backend unlink operations")
- s.readdirOps = m.getInternalCounter("unix_readdir_ops", "Number of backend readdir operations")
}
func (s *unixStats) TickErr(err error) {
}
type osWithStats struct {
- stats unixStats
+ stats unixStats
+ opsCounters *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ ioBytes *prometheus.CounterVec
+}
+
+func (o *osWithStats) tickErr(err error) {
+ if err == nil {
+ return
+ }
+ o.errCounters.With(prometheus.Labels{"type": fmt.Sprintf("%T", err)}).Inc()
+}
+
+func (o *osWithStats) promSetup(opsC, errC, ioB *prometheus.CounterVec) {
+ o.opsCounters = opsC
+ o.errCounters = errC
+ o.ioBytes = ioB
}
func (o *osWithStats) Open(name string) (*os.File, error) {
- o.stats.openOps.Inc()
+ o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc()
o.stats.Tick(&o.stats.OpenOps)
f, err := os.Open(name)
+ o.tickErr(err)
o.stats.TickErr(err)
return f, err
}
func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
- o.stats.openOps.Inc()
+ o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc()
o.stats.Tick(&o.stats.OpenOps)
f, err := os.OpenFile(name, flag, perm)
+ o.tickErr(err)
o.stats.TickErr(err)
return f, err
}
func (o *osWithStats) Remove(path string) error {
- o.stats.unlinkOps.Inc()
+ o.opsCounters.With(prometheus.Labels{"operation": "unlink"}).Inc()
o.stats.Tick(&o.stats.UnlinkOps)
err := os.Remove(path)
+ o.tickErr(err)
o.stats.TickErr(err)
return err
}
func (o *osWithStats) Rename(a, b string) error {
- o.stats.renameOps.Inc()
+ o.opsCounters.With(prometheus.Labels{"operation": "rename"}).Inc()
o.stats.Tick(&o.stats.RenameOps)
err := os.Rename(a, b)
+ o.tickErr(err)
o.stats.TickErr(err)
return err
}
func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
- // Avoid segfaulting when called from vol.Status() on theConfig.Start()
- if o.stats.statOps != nil {
- o.stats.statOps.Inc()
- }
+ o.opsCounters.With(prometheus.Labels{"operation": "stat"}).Inc()
o.stats.Tick(&o.stats.StatOps)
fi, err := os.Stat(path)
+ o.tickErr(err)
o.stats.TickErr(err)
return fi, err
}
func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
- o.stats.createOps.Inc()
+ o.opsCounters.With(prometheus.Labels{"operation": "create"}).Inc()
o.stats.Tick(&o.stats.CreateOps)
f, err := ioutil.TempFile(dir, base)
+ o.tickErr(err)
o.stats.TickErr(err)
return f, err
}
Root: "/",
ReadOnly: true,
}
- metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
- v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
- if err := v.Start(metrics); err != nil {
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ if err := v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes); err != nil {
t.Error(err)
}
if got := v.Replication(); got != 1 {