Also, generalizes counters usage and updates unix & s3 volume drivers.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
} else if !ok {
return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
+ v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
return nil
}
}
func (c *azureContainer) Exists() (bool, error) {
+ c.stats.TickOps("exists")
c.stats.Tick(&c.stats.Ops)
ok, err := c.ctr.Exists()
c.stats.TickErr(err)
}
func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+ c.stats.TickOps("get_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetMetadata(nil)
}
func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+ c.stats.TickOps("get_properties")
c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetProperties(nil)
}
func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+ c.stats.TickOps("get")
c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.Get(nil)
}
func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+ c.stats.TickOps("get_range")
c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
}
func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+ c.stats.TickOps("create")
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
if size != 0 {
rdr = &readerWithAzureLen{
}
func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+ c.stats.TickOps("set_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
b := c.ctr.GetBlobReference(bname)
b.Metadata = m
}
func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+ c.stats.TickOps("list")
c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
resp, err := c.ctr.ListBlobs(params)
c.stats.TickErr(err)
}
func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+ c.stats.TickOps("delete")
c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
b := c.ctr.GetBlobReference(bname)
err := b.Delete(opts)
b.stats.TickOps("get")
b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
b.stats.TickErr(err)
- return NewCountingReader(
- rdr,
- func(c uint64) {
- b.stats.CountBytesIn(c)
- b.stats.TickInBytes(c)
- }), err
+ return NewCountingReader(rdr, b.stats.TickInBytes), err
}
func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
// empty objects.
r = nil
} else {
- r = NewCountingReader(
- r,
- func(c uint64) {
- b.stats.CountBytesOut(c)
- b.stats.TickOutBytes(c)
- })
+ r = NewCountingReader(r, b.stats.TickOutBytes)
}
err := b.Bucket.PutReader(path, r, length, contType, perm, options)
b.stats.TickOps("put")
HeadOps uint64
DelOps uint64
ListOps uint64
-
- opsCounters *prometheus.CounterVec
- errCounters *prometheus.CounterVec
- ioBytes *prometheus.CounterVec
}
func (s *s3bucketStats) TickErr(err error) {
if err, ok := err.(*s3.Error); ok {
errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
}
- if s.errCounters != nil {
- s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
- }
s.statsTicker.TickErr(err, errType)
}
-
-func (s *s3bucketStats) TickOps(operations ...string) {
- if s.opsCounters == nil {
- return
- }
- for _, opType := range operations {
- s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
- }
-}
-
-func (s *s3bucketStats) CountBytesIn(b uint64) {
- s.countBytes("in", float64(b))
-}
-
-func (s *s3bucketStats) CountBytesOut(b uint64) {
- s.countBytes("out", float64(b))
-}
-
-func (s *s3bucketStats) countBytes(direction string, bytes float64) {
- if s.ioBytes == nil {
- return
- }
- s.ioBytes.With(prometheus.Labels{"direction": direction}).Add(bytes)
-}
import (
"sync"
"sync/atomic"
+
+ "github.com/prometheus/client_golang/prometheus"
)
type statsTicker struct {
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
+
+ opsCounters *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ ioBytes *prometheus.CounterVec
}
// Tick increments each of the given counters by 1 using
}
s.ErrorCodes[errType]++
s.lock.Unlock()
+ if s.errCounters != nil {
+ s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+ }
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
+ if s.ioBytes != nil {
+ s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+ }
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
+ if s.ioBytes != nil {
+ s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+ }
atomic.AddUint64(&s.OutBytes, n)
}
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+ if s.opsCounters == nil {
+ return
+ }
+ for _, opType := range operations {
+ s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+ }
+}
locker sync.Locker
os osWithStats
-
- // Volume metrics
- opsCounters *prometheus.CounterVec
- errCounters *prometheus.CounterVec
- ioBytes *prometheus.CounterVec
}
// DeviceID returns a globally unique ID for the volume's root
}
// Set up prometheus metrics
lbls := prometheus.Labels{"device_id": v.DeviceID()}
- v.opsCounters = opsCounters.MustCurryWith(lbls)
- v.errCounters = errCounters.MustCurryWith(lbls)
- v.ioBytes = ioBytes.MustCurryWith(lbls)
- v.os.promSetup(v.opsCounters, v.errCounters, v.ioBytes)
+ v.os.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+ v.os.stats.errCounters = errCounters.MustCurryWith(lbls)
+ v.os.stats.ioBytes = ioBytes.MustCurryWith(lbls)
_, err := v.os.Stat(v.Root)
}
defer v.unlockfile(f)
ts := syscall.NsecToTimespec(time.Now().UnixNano())
- if v.os.opsCounters != nil {
- v.os.opsCounters.With(prometheus.Labels{"operation": "utimes"}).Inc()
- }
+ v.os.stats.TickOps("utimes")
v.os.stats.Tick(&v.os.stats.UtimesOps)
err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
- v.os.tickErr(err)
v.os.stats.TickErr(err)
return err
}
return err
}
defer f.Close()
- return fn(NewCountingReader(
- ioutil.NopCloser(f),
- func(c uint64) {
- v.os.stats.TickInBytes(c)
- if v.ioBytes != nil {
- v.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(c))
- }
- }))
+ return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
}
// stat is os.Stat() with some extra sanity checks.
}
defer v.unlock()
n, err := io.Copy(tmpfile, rdr)
- if v.ioBytes != nil {
- v.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
- }
v.os.stats.TickOutBytes(uint64(n))
if err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
return err
}
defer rootdir.Close()
- if v.opsCounters != nil {
- v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
- }
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
names, err := rootdir.Readdirnames(1)
lastErr = err
continue
}
- if v.opsCounters != nil {
- v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
- }
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
fileInfo, err := blockdir.Readdir(1)
return MethodDisabledError
}
- if v.opsCounters != nil {
- v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
- }
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(loc))
if err != nil {
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func (v *UnixVolume) lockfile(f *os.File) error {
- if v.opsCounters != nil {
- v.opsCounters.With(prometheus.Labels{"operation": "flock"}).Inc()
- }
+ v.os.stats.TickOps("flock")
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
- v.os.tickErr(err)
v.os.stats.TickErr(err)
return err
}
func (v *UnixVolume) unlockfile(f *os.File) error {
err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
- v.os.tickErr(err)
v.os.stats.TickErr(err)
return err
}
}
type osWithStats struct {
- stats unixStats
- opsCounters *prometheus.CounterVec
- errCounters *prometheus.CounterVec
- ioBytes *prometheus.CounterVec
-}
-
-func (o *osWithStats) tickErr(err error) {
- if err == nil || o.errCounters == nil {
- return
- }
- o.errCounters.With(prometheus.Labels{"error_type": fmt.Sprintf("%T", err)}).Inc()
-}
-
-func (o *osWithStats) promSetup(opsC, errC, ioB *prometheus.CounterVec) {
- o.opsCounters = opsC
- o.errCounters = errC
- o.ioBytes = ioB
+ stats unixStats
}
func (o *osWithStats) Open(name string) (*os.File, error) {
- if o.opsCounters != nil {
- o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc()
- }
+ o.stats.TickOps("open")
o.stats.Tick(&o.stats.OpenOps)
f, err := os.Open(name)
- o.tickErr(err)
o.stats.TickErr(err)
return f, err
}
func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
- if o.opsCounters != nil {
- o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc()
- }
+ o.stats.TickOps("open")
o.stats.Tick(&o.stats.OpenOps)
f, err := os.OpenFile(name, flag, perm)
- o.tickErr(err)
o.stats.TickErr(err)
return f, err
}
func (o *osWithStats) Remove(path string) error {
- if o.opsCounters != nil {
- o.opsCounters.With(prometheus.Labels{"operation": "unlink"}).Inc()
- }
+ o.stats.TickOps("unlink")
o.stats.Tick(&o.stats.UnlinkOps)
err := os.Remove(path)
- o.tickErr(err)
o.stats.TickErr(err)
return err
}
func (o *osWithStats) Rename(a, b string) error {
- if o.opsCounters != nil {
- o.opsCounters.With(prometheus.Labels{"operation": "rename"}).Inc()
- }
+ o.stats.TickOps("rename")
o.stats.Tick(&o.stats.RenameOps)
err := os.Rename(a, b)
- o.tickErr(err)
o.stats.TickErr(err)
return err
}
func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
- if o.opsCounters != nil {
- o.opsCounters.With(prometheus.Labels{"operation": "stat"}).Inc()
- }
+ o.stats.TickOps("stat")
o.stats.Tick(&o.stats.StatOps)
fi, err := os.Stat(path)
- o.tickErr(err)
o.stats.TickErr(err)
return fi, err
}
func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
- if o.opsCounters != nil {
- o.opsCounters.With(prometheus.Labels{"operation": "create"}).Inc()
- }
+ o.stats.TickOps("create")
o.stats.Tick(&o.stats.CreateOps)
f, err := ioutil.TempFile(dir, base)
- o.tickErr(err)
o.stats.TickErr(err)
return f, err
}