type nodeMetrics struct {
reg *prometheus.Registry
- rc httpserver.RequestCounter
}
-func (m *nodeMetrics) setup() {
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Name: "bufferpool_bytes_allocated",
Help: "Number of bytes allocated to buffers",
},
- func() float64 { return float64(bufs.Alloc()) },
+ func() float64 { return float64(b.Alloc()) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "bufferpool_buffers_max",
Help: "Maximum number of buffers allowed",
},
- func() float64 { return float64(bufs.Cap()) },
+ func() float64 { return float64(b.Cap()) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "bufferpool_buffers_in_use",
Help: "Number of buffers in use",
},
- func() float64 { return float64(bufs.Len()) },
+ func() float64 { return float64(b.Len()) },
))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "pull_queue_in_progress",
- Help: "Number of pull requests in progress",
+ Name: fmt.Sprintf("%s_queue_in_progress", qName),
+ Help: fmt.Sprintf("Number of %s requests in progress", qName),
},
- func() float64 { return float64(getWorkQueueStatus(pullq).InProgress) },
+ func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "pull_queue_queued",
- Help: "Number of queued pull requests",
+ Name: fmt.Sprintf("%s_queue_queued", qName),
+ Help: fmt.Sprintf("Number of queued %s requests", qName),
},
- func() float64 { return float64(getWorkQueueStatus(pullq).Queued) },
+ func() float64 { return float64(getWorkQueueStatus(q).Queued) },
))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "trash_queue_in_progress",
- Help: "Number of trash requests in progress",
+ Name: "requests_current",
+ Help: "Number of requests in progress",
},
- func() float64 { return float64(getWorkQueueStatus(trashq).InProgress) },
+ func() float64 { return float64(rc.Current()) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "trash_queue_queued",
- Help: "Number of queued trash requests",
+ Name: "requests_max",
+ Help: "Maximum number of concurrent requests",
},
- func() float64 { return float64(getWorkQueueStatus(trashq).Queued) },
+ func() float64 { return float64(rc.Max()) },
))
- m.reg.MustRegister(prometheus.NewGaugeFunc(
+}
+
+type volumeMetricsVecs struct {
+ BytesFree *prometheus.GaugeVec
+ BytesUsed *prometheus.GaugeVec
+ Errors *prometheus.CounterVec
+ Ops *prometheus.CounterVec
+ CompareOps *prometheus.CounterVec
+ GetOps *prometheus.CounterVec
+ PutOps *prometheus.CounterVec
+ TouchOps *prometheus.CounterVec
+ InBytes *prometheus.CounterVec
+ OutBytes *prometheus.CounterVec
+ ErrorCodes *prometheus.CounterVec
+}
+
+type volumeMetrics struct {
+ BytesFree prometheus.Gauge
+ BytesUsed prometheus.Gauge
+ Errors prometheus.Counter
+ Ops prometheus.Counter
+ CompareOps prometheus.Counter
+ GetOps prometheus.Counter
+ PutOps prometheus.Counter
+ TouchOps prometheus.Counter
+ InBytes prometheus.Counter
+ OutBytes prometheus.Counter
+ ErrorCodes *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+ m := &volumeMetricsVecs{}
+ m.BytesFree = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "requests_current",
- Help: "Number of requests in progress",
+ Name: "volume_bytes_free",
+ Help: "Number of free bytes on the volume",
},
- func() float64 { return float64(m.rc.Current()) },
- ))
- m.reg.MustRegister(prometheus.NewGaugeFunc(
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.BytesFree)
+ m.BytesUsed = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "requests_max",
- Help: "Maximum number of concurrent requests",
+ Name: "volume_bytes_used",
+ Help: "Number of used bytes on the volume",
},
- func() float64 { return float64(m.rc.Max()) },
- ))
- // Register individual volume's metrics
- vols := KeepVM.AllReadable()
- for _, vol := range vols {
- labels := prometheus.Labels{
- "label": vol.String(),
- "mount_point": vol.Status().MountPoint,
- "device_number": fmt.Sprintf("%d", vol.Status().DeviceNum),
- }
- if vol, ok := vol.(InternalMetricser); ok {
- // Per-driver internal metrics
- vol.SetupInternalMetrics(m.reg, labels)
- }
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_bytes_free",
- Help: "Number of free bytes on the volume",
- ConstLabels: labels,
- },
- func() float64 { return float64(vol.Status().BytesFree) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_bytes_used",
- Help: "Number of used bytes on the volume",
- ConstLabels: labels,
- },
- func() float64 { return float64(vol.Status().BytesUsed) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_errors",
- Help: "Number of I/O errors",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).Errors) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_ops",
- Help: "Number of I/O operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).Ops) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_compare_ops",
- Help: "Number of I/O compare operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).CompareOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_get_ops",
- Help: "Number of I/O get operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).GetOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_put_ops",
- Help: "Number of I/O put operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).PutOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_touch_ops",
- Help: "Number of I/O touch operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).TouchOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_input_bytes",
- Help: "Number of input bytes",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).InBytes) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_output_bytes",
- Help: "Number of output bytes",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).OutBytes) },
- ))
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.BytesUsed)
+ m.Errors = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_errors",
+ Help: "Number of volume I/O errors",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.Errors)
+ m.Ops = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_ops",
+ Help: "Number of volume I/O operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.Ops)
+ m.CompareOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_compare_ops",
+ Help: "Number of volume I/O compare operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.CompareOps)
+ m.GetOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_get_ops",
+ Help: "Number of volume I/O get operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.GetOps)
+ m.PutOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_put_ops",
+ Help: "Number of volume I/O put operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.PutOps)
+ m.TouchOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_touch_ops",
+ Help: "Number of volume I/O touch operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.TouchOps)
+ m.InBytes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_in_bytes",
+ Help: "Number of input bytes",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.InBytes)
+ m.OutBytes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_out_bytes",
+ Help: "Number of output bytes",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.OutBytes)
+ m.ErrorCodes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_error_codes",
+ Help: "Number of I/O errors by error code",
+ },
+ []string{"label", "mount_point", "device_number", "error_code"},
+ )
+ reg.MustRegister(m.ErrorCodes)
+
+ return m
+}
+
+func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
+ lbls := []string{lbl, mnt, dev}
+ curried := &volumeMetrics{
+ BytesFree: m.BytesFree.WithLabelValues(lbls...),
+ BytesUsed: m.BytesUsed.WithLabelValues(lbls...),
+ Errors: m.Errors.WithLabelValues(lbls...),
+ Ops: m.Ops.WithLabelValues(lbls...),
+ CompareOps: m.CompareOps.WithLabelValues(lbls...),
+ GetOps: m.GetOps.WithLabelValues(lbls...),
+ PutOps: m.PutOps.WithLabelValues(lbls...),
+ TouchOps: m.TouchOps.WithLabelValues(lbls...),
+ InBytes: m.InBytes.WithLabelValues(lbls...),
+ OutBytes: m.OutBytes.WithLabelValues(lbls...),
+ ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
+ "label": lbl,
+ "mount_point": mnt,
+ "device_number": dev,
+ }),
}
+ return curried
}
"sync/atomic"
"syscall"
"time"
-
- "github.com/prometheus/client_golang/prometheus"
)
type unixVolumeAdder struct {
locker sync.Locker
os osWithStats
+
+ metrics *volumeMetrics
}
// DeviceID returns a globally unique ID for the volume's root
}
// Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(m *volumeMetrics) error {
if v.Serialize {
v.locker = &sync.Mutex{}
}
v.DirectoryReplication = 1
}
_, err := v.os.Stat(v.Root)
+ if err == nil {
+ // Set up prometheus metrics
+ v.metrics = m
+ v.os.stats.PromErrors = v.metrics.Errors
+ v.os.stats.PromErrorCodes = v.metrics.ErrorCodes
+ v.os.stats.PromInBytes = v.metrics.InBytes
+ v.os.stats.PromOutBytes = v.metrics.OutBytes
+ // Periodically update free/used volume space
+ go func() {
+ for {
+ v.metrics.BytesFree.Set(float64(v.Status().BytesFree))
+ v.metrics.BytesUsed.Set(float64(v.Status().BytesUsed))
+ time.Sleep(10 * time.Second)
+ }
+ }()
+ }
return err
}
// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
+ v.metrics.Ops.Inc()
+ v.metrics.TouchOps.Inc()
if v.ReadOnly {
return MethodDisabledError
}
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ v.metrics.Ops.Inc()
+ v.metrics.GetOps.Inc()
return getWithPipe(ctx, loc, buf, v)
}
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+ v.metrics.Ops.Inc()
+ v.metrics.CompareOps.Inc()
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+ v.metrics.Ops.Inc()
+ v.metrics.PutOps.Inc()
return putWithPipe(ctx, loc, block, v)
}
log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
-// SetupInternalMetrics registers driver stats to Prometheus.
-// Implements InternalMetricser interface.
-func (v *UnixVolume) SetupInternalMetrics(reg *prometheus.Registry, lbl prometheus.Labels) {
- v.os.stats.setupPrometheus(reg, lbl)
-}
-
-func (s *unixStats) setupPrometheus(reg *prometheus.Registry, lbl prometheus.Labels) {
- // Common backend metrics
- s.statsTicker.setupPrometheus("unix", reg, lbl)
- // Driver-specific backend metrics
- metrics := map[string][]interface{}{
- "open_ops": []interface{}{string("open operations"), s.OpenOps},
- "stat_ops": []interface{}{string("stat operations"), s.StatOps},
- "flock_ops": []interface{}{string("flock operations"), s.FlockOps},
- "utimes_ops": []interface{}{string("utimes operations"), s.UtimesOps},
- "create_ops": []interface{}{string("create operations"), s.CreateOps},
- "rename_ops": []interface{}{string("rename operations"), s.RenameOps},
- "unlink_ops": []interface{}{string("unlink operations"), s.UnlinkOps},
- "readdir_ops": []interface{}{string("readdir operations"), s.ReaddirOps},
- }
- for mName, data := range metrics {
- mHelp := data[0].(string)
- mVal := data[1].(uint64)
- reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: fmt.Sprintf("unix_%s", mName),
- Help: fmt.Sprintf("Number of unix backend %s", mHelp),
- ConstLabels: lbl,
- },
- func() float64 { return float64(mVal) },
- ))
- }
-}
-
type unixStats struct {
statsTicker
OpenOps uint64