13937: Adds counters to azure driver.
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 28 Feb 2019 18:58:35 +0000 (15:58 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 28 Feb 2019 18:58:35 +0000 (15:58 -0300)
Also, generalizes counters usage and updates unix & s3 volume drivers.

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

services/keepstore/azure_blob_volume.go
services/keepstore/s3_volume.go
services/keepstore/stats_ticker.go
services/keepstore/unix_volume.go

index ce7063aa79cdafbd46f52fc5231c739a7e277881..9b957815c84bfe7d3c41425646cf147fa38c4098 100644 (file)
@@ -184,6 +184,12 @@ func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.Co
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+       v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
+       v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
        return nil
 }
 
@@ -725,6 +731,7 @@ type azureContainer struct {
 }
 
 func (c *azureContainer) Exists() (bool, error) {
+       c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
@@ -732,6 +739,7 @@ func (c *azureContainer) Exists() (bool, error) {
 }
 
 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+       c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
@@ -740,6 +748,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er
 }
 
 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+       c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
@@ -748,6 +757,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie
 }
 
 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+       c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
@@ -756,6 +766,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 }
 
 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+       c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@ -783,6 +794,7 @@ func (r *readerWithAzureLen) Len() int {
 }
 
 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+       c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
@@ -797,6 +809,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i
 }
 
 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+       c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
@@ -806,6 +819,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o
 }
 
 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
@@ -813,6 +827,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.
 }
 
 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+       c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)
index 5d7332ff62e653e285470b989559670443b75357..0bf0d9a10e3241e375f6abc070e27260fb4d698e 100644 (file)
@@ -976,12 +976,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
-       return NewCountingReader(
-               rdr,
-               func(c uint64) {
-                       b.stats.CountBytesIn(c)
-                       b.stats.TickInBytes(c)
-               }), err
+       return NewCountingReader(rdr, b.stats.TickInBytes), err
 }
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
@@ -1002,12 +997,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                // empty objects.
                r = nil
        } else {
-               r = NewCountingReader(
-                       r,
-                       func(c uint64) {
-                               b.stats.CountBytesOut(c)
-                               b.stats.TickOutBytes(c)
-                       })
+               r = NewCountingReader(r, b.stats.TickOutBytes)
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
        b.stats.TickOps("put")
@@ -1032,10 +1022,6 @@ type s3bucketStats struct {
        HeadOps uint64
        DelOps  uint64
        ListOps uint64
-
-       opsCounters *prometheus.CounterVec
-       errCounters *prometheus.CounterVec
-       ioBytes     *prometheus.CounterVec
 }
 
 func (s *s3bucketStats) TickErr(err error) {
@@ -1046,32 +1032,5 @@ func (s *s3bucketStats) TickErr(err error) {
        if err, ok := err.(*s3.Error); ok {
                errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
        }
-       if s.errCounters != nil {
-               s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
-       }
        s.statsTicker.TickErr(err, errType)
 }
-
-func (s *s3bucketStats) TickOps(operations ...string) {
-       if s.opsCounters == nil {
-               return
-       }
-       for _, opType := range operations {
-               s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
-       }
-}
-
-func (s *s3bucketStats) CountBytesIn(b uint64) {
-       s.countBytes("in", float64(b))
-}
-
-func (s *s3bucketStats) CountBytesOut(b uint64) {
-       s.countBytes("out", float64(b))
-}
-
-func (s *s3bucketStats) countBytes(direction string, bytes float64) {
-       if s.ioBytes == nil {
-               return
-       }
-       s.ioBytes.With(prometheus.Labels{"direction": direction}).Add(bytes)
-}
index 377a53675783b890fa7863dd98ea50681074697b..342b9e32058e23a1f09fc305d8fdc37caf104198 100644 (file)
@@ -7,6 +7,8 @@ package main
 import (
        "sync"
        "sync/atomic"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type statsTicker struct {
@@ -16,6 +18,10 @@ type statsTicker struct {
 
        ErrorCodes map[string]uint64 `json:",omitempty"`
        lock       sync.Mutex
+
+       opsCounters *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       ioBytes     *prometheus.CounterVec
 }
 
 // Tick increments each of the given counters by 1 using
@@ -41,14 +47,33 @@ func (s *statsTicker) TickErr(err error, errType string) {
        }
        s.ErrorCodes[errType]++
        s.lock.Unlock()
+       if s.errCounters != nil {
+               s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+       }
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.OutBytes, n)
 }
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+       if s.opsCounters == nil {
+               return
+       }
+       for _, opType := range operations {
+               s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+       }
+}
index db8ad68a7abd369283f07c17bde6864bdb303960..8bdbf936293beb41058e7c23897296598c52cf6b 100644 (file)
@@ -120,11 +120,6 @@ type UnixVolume struct {
        locker sync.Locker
 
        os osWithStats
-
-       // Volume metrics
-       opsCounters *prometheus.CounterVec
-       errCounters *prometheus.CounterVec
-       ioBytes     *prometheus.CounterVec
 }
 
 // DeviceID returns a globally unique ID for the volume's root
@@ -237,10 +232,9 @@ func (v *UnixVolume) Start(opsCounters, errCounters, ioBytes *prometheus.Counter
        }
        // Set up prometheus metrics
        lbls := prometheus.Labels{"device_id": v.DeviceID()}
-       v.opsCounters = opsCounters.MustCurryWith(lbls)
-       v.errCounters = errCounters.MustCurryWith(lbls)
-       v.ioBytes = ioBytes.MustCurryWith(lbls)
-       v.os.promSetup(v.opsCounters, v.errCounters, v.ioBytes)
+       v.os.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+       v.os.stats.errCounters = errCounters.MustCurryWith(lbls)
+       v.os.stats.ioBytes = ioBytes.MustCurryWith(lbls)
 
        _, err := v.os.Stat(v.Root)
 
@@ -267,12 +261,9 @@ func (v *UnixVolume) Touch(loc string) error {
        }
        defer v.unlockfile(f)
        ts := syscall.NsecToTimespec(time.Now().UnixNano())
-       if v.os.opsCounters != nil {
-               v.os.opsCounters.With(prometheus.Labels{"operation": "utimes"}).Inc()
-       }
+       v.os.stats.TickOps("utimes")
        v.os.stats.Tick(&v.os.stats.UtimesOps)
        err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
-       v.os.tickErr(err)
        v.os.stats.TickErr(err)
        return err
 }
@@ -299,14 +290,7 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader
                return err
        }
        defer f.Close()
-       return fn(NewCountingReader(
-               ioutil.NopCloser(f),
-               func(c uint64) {
-                       v.os.stats.TickInBytes(c)
-                       if v.ioBytes != nil {
-                               v.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(c))
-                       }
-               }))
+       return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
 }
 
 // stat is os.Stat() with some extra sanity checks.
@@ -393,9 +377,6 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        }
        defer v.unlock()
        n, err := io.Copy(tmpfile, rdr)
-       if v.ioBytes != nil {
-               v.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
-       }
        v.os.stats.TickOutBytes(uint64(n))
        if err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -468,9 +449,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                return err
        }
        defer rootdir.Close()
-       if v.opsCounters != nil {
-               v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
-       }
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        for {
                names, err := rootdir.Readdirnames(1)
@@ -493,9 +472,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        lastErr = err
                        continue
                }
-               if v.opsCounters != nil {
-                       v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
-               }
+               v.os.stats.TickOps("readdir")
                v.os.stats.Tick(&v.os.stats.ReaddirOps)
                for {
                        fileInfo, err := blockdir.Readdir(1)
@@ -584,9 +561,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return MethodDisabledError
        }
 
-       if v.opsCounters != nil {
-               v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc()
-       }
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(loc))
        if err != nil {
@@ -733,19 +708,15 @@ func (v *UnixVolume) unlock() {
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func (v *UnixVolume) lockfile(f *os.File) error {
-       if v.opsCounters != nil {
-               v.opsCounters.With(prometheus.Labels{"operation": "flock"}).Inc()
-       }
+       v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-       v.os.tickErr(err)
        v.os.stats.TickErr(err)
        return err
 }
 
 func (v *UnixVolume) unlockfile(f *os.File) error {
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
-       v.os.tickErr(err)
        v.os.stats.TickErr(err)
        return err
 }
@@ -852,87 +823,53 @@ func (s *unixStats) TickErr(err error) {
 }
 
 type osWithStats struct {
-       stats       unixStats
-       opsCounters *prometheus.CounterVec
-       errCounters *prometheus.CounterVec
-       ioBytes     *prometheus.CounterVec
-}
-
-func (o *osWithStats) tickErr(err error) {
-       if err == nil || o.errCounters == nil {
-               return
-       }
-       o.errCounters.With(prometheus.Labels{"error_type": fmt.Sprintf("%T", err)}).Inc()
-}
-
-func (o *osWithStats) promSetup(opsC, errC, ioB *prometheus.CounterVec) {
-       o.opsCounters = opsC
-       o.errCounters = errC
-       o.ioBytes = ioB
+       stats unixStats
 }
 
 func (o *osWithStats) Open(name string) (*os.File, error) {
-       if o.opsCounters != nil {
-               o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc()
-       }
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.Open(name)
-       o.tickErr(err)
        o.stats.TickErr(err)
        return f, err
 }
 
 func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
-       if o.opsCounters != nil {
-               o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc()
-       }
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.OpenFile(name, flag, perm)
-       o.tickErr(err)
        o.stats.TickErr(err)
        return f, err
 }
 
 func (o *osWithStats) Remove(path string) error {
-       if o.opsCounters != nil {
-               o.opsCounters.With(prometheus.Labels{"operation": "unlink"}).Inc()
-       }
+       o.stats.TickOps("unlink")
        o.stats.Tick(&o.stats.UnlinkOps)
        err := os.Remove(path)
-       o.tickErr(err)
        o.stats.TickErr(err)
        return err
 }
 
 func (o *osWithStats) Rename(a, b string) error {
-       if o.opsCounters != nil {
-               o.opsCounters.With(prometheus.Labels{"operation": "rename"}).Inc()
-       }
+       o.stats.TickOps("rename")
        o.stats.Tick(&o.stats.RenameOps)
        err := os.Rename(a, b)
-       o.tickErr(err)
        o.stats.TickErr(err)
        return err
 }
 
 func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
-       if o.opsCounters != nil {
-               o.opsCounters.With(prometheus.Labels{"operation": "stat"}).Inc()
-       }
+       o.stats.TickOps("stat")
        o.stats.Tick(&o.stats.StatOps)
        fi, err := os.Stat(path)
-       o.tickErr(err)
        o.stats.TickErr(err)
        return fi, err
 }
 
 func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
-       if o.opsCounters != nil {
-               o.opsCounters.With(prometheus.Labels{"operation": "create"}).Inc()
-       }
+       o.stats.TickOps("create")
        o.stats.Tick(&o.stats.CreateOps)
        f, err := ioutil.TempFile(dir, base)
-       o.tickErr(err)
        o.stats.TickErr(err)
        return f, err
 }