13937: Adds counters to S3 driver.
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 28 Feb 2019 17:36:39 +0000 (14:36 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 28 Feb 2019 17:36:39 +0000 (14:36 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

services/keepstore/s3_volume.go
services/keepstore/unix_volume.go

index 25bfa4ef0962d9347b52ffc2f14acb923a7ff6be..5d7332ff62e653e285470b989559670443b75357 100644 (file)
@@ -249,6 +249,12 @@ func (v *S3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVe
                        Name: v.Bucket,
                },
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.bucket.stats.opsCounters = opsCounters.MustCurryWith(lbls)
+       v.bucket.stats.errCounters = errCounters.MustCurryWith(lbls)
+       v.bucket.stats.ioBytes = ioBytes.MustCurryWith(lbls)
+
        return nil
 }
 
@@ -930,6 +936,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.TickOps("list")
        lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
@@ -966,13 +973,20 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
+       b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
-       return NewCountingReader(rdr, b.stats.TickInBytes), err
+       return NewCountingReader(
+               rdr,
+               func(c uint64) {
+                       b.stats.CountBytesIn(c)
+                       b.stats.TickInBytes(c)
+               }), err
 }
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
+       b.stats.TickOps("head")
        b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.TickErr(err)
        return resp, err
@@ -988,9 +1002,15 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                // empty objects.
                r = nil
        } else {
-               r = NewCountingReader(r, b.stats.TickOutBytes)
+               r = NewCountingReader(
+                       r,
+                       func(c uint64) {
+                               b.stats.CountBytesOut(c)
+                               b.stats.TickOutBytes(c)
+                       })
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+       b.stats.TickOps("put")
        b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.TickErr(err)
        return err
@@ -998,6 +1018,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
+       b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        return err
@@ -1011,6 +1032,10 @@ type s3bucketStats struct {
        HeadOps uint64
        DelOps  uint64
        ListOps uint64
+
+       opsCounters *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       ioBytes     *prometheus.CounterVec
 }
 
 func (s *s3bucketStats) TickErr(err error) {
@@ -1021,5 +1046,32 @@ func (s *s3bucketStats) TickErr(err error) {
        if err, ok := err.(*s3.Error); ok {
                errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
        }
+       if s.errCounters != nil {
+               s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+       }
        s.statsTicker.TickErr(err, errType)
 }
+
+func (s *s3bucketStats) TickOps(operations ...string) {
+       if s.opsCounters == nil {
+               return
+       }
+       for _, opType := range operations {
+               s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+       }
+}
+
+func (s *s3bucketStats) CountBytesIn(b uint64) {
+       s.countBytes("in", float64(b))
+}
+
+func (s *s3bucketStats) CountBytesOut(b uint64) {
+       s.countBytes("out", float64(b))
+}
+
+func (s *s3bucketStats) countBytes(direction string, bytes float64) {
+       if s.ioBytes == nil {
+               return
+       }
+       s.ioBytes.With(prometheus.Labels{"direction": direction}).Add(bytes)
+}
index ee74d96c05148649ccc34d40c2c208e6afa4b58c..db8ad68a7abd369283f07c17bde6864bdb303960 100644 (file)
@@ -272,6 +272,7 @@ func (v *UnixVolume) Touch(loc string) error {
        }
        v.os.stats.Tick(&v.os.stats.UtimesOps)
        err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
+       v.os.tickErr(err)
        v.os.stats.TickErr(err)
        return err
 }
@@ -737,12 +738,14 @@ func (v *UnixVolume) lockfile(f *os.File) error {
        }
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+       v.os.tickErr(err)
        v.os.stats.TickErr(err)
        return err
 }
 
 func (v *UnixVolume) unlockfile(f *os.File) error {
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+       v.os.tickErr(err)
        v.os.stats.TickErr(err)
        return err
 }