From 92d1b6e05e042a0781070e7287b1ceb3e094e852 Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Thu, 28 Feb 2019 15:58:35 -0300 Subject: [PATCH] 13937: Adds counters to azure driver. Also, generalizes counters usage and updates unix & s3 volume drivers. Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima --- services/keepstore/azure_blob_volume.go | 15 ++++ services/keepstore/s3_volume.go | 45 +----------- services/keepstore/stats_ticker.go | 25 +++++++ services/keepstore/unix_volume.go | 95 +++++-------------------- 4 files changed, 58 insertions(+), 122 deletions(-) diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index ce7063aa79..9b957815c8 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -184,6 +184,12 @@ func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.Co } else if !ok { return fmt.Errorf("Azure container %q does not exist", v.ContainerName) } + // Set up prometheus metrics + lbls := prometheus.Labels{"device_id": v.DeviceID()} + v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls) + v.container.stats.errCounters = errCounters.MustCurryWith(lbls) + v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls) + return nil } @@ -725,6 +731,7 @@ type azureContainer struct { } func (c *azureContainer) Exists() (bool, error) { + c.stats.TickOps("exists") c.stats.Tick(&c.stats.Ops) ok, err := c.ctr.Exists() c.stats.TickErr(err) @@ -732,6 +739,7 @@ func (c *azureContainer) Exists() (bool, error) { } func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) { + c.stats.TickOps("get_metadata") c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps) b := c.ctr.GetBlobReference(bname) err := b.GetMetadata(nil) @@ -740,6 +748,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er } func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) { + c.stats.TickOps("get_properties") c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps) b := c.ctr.GetBlobReference(bname) err := b.GetProperties(nil) @@ -748,6 +757,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie } func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) { + c.stats.TickOps("get") c.stats.Tick(&c.stats.Ops, &c.stats.GetOps) b := c.ctr.GetBlobReference(bname) rdr, err := b.Get(nil) @@ -756,6 +766,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) { } func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) { + c.stats.TickOps("get_range") c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps) b := c.ctr.GetBlobReference(bname) rdr, err := b.GetRange(&storage.GetBlobRangeOptions{ @@ -783,6 +794,7 @@ func (r *readerWithAzureLen) Len() int { } func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error { + c.stats.TickOps("create") c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps) if size != 0 { rdr = &readerWithAzureLen{ @@ -797,6 +809,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i } func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error { + c.stats.TickOps("set_metadata") c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps) b := c.ctr.GetBlobReference(bname) b.Metadata = m @@ -806,6 +819,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o } func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) { + c.stats.TickOps("list") c.stats.Tick(&c.stats.Ops, &c.stats.ListOps) resp, err := c.ctr.ListBlobs(params) c.stats.TickErr(err) @@ -813,6 +827,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage. } func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error { + c.stats.TickOps("delete") c.stats.Tick(&c.stats.Ops, &c.stats.DelOps) b := c.ctr.GetBlobReference(bname) err := b.Delete(opts) diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 5d7332ff62..0bf0d9a10e 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -976,12 +976,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) { b.stats.TickOps("get") b.stats.Tick(&b.stats.Ops, &b.stats.GetOps) b.stats.TickErr(err) - return NewCountingReader( - rdr, - func(c uint64) { - b.stats.CountBytesIn(c) - b.stats.TickInBytes(c) - }), err + return NewCountingReader(rdr, b.stats.TickInBytes), err } func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) { @@ -1002,12 +997,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st // empty objects. r = nil } else { - r = NewCountingReader( - r, - func(c uint64) { - b.stats.CountBytesOut(c) - b.stats.TickOutBytes(c) - }) + r = NewCountingReader(r, b.stats.TickOutBytes) } err := b.Bucket.PutReader(path, r, length, contType, perm, options) b.stats.TickOps("put") @@ -1032,10 +1022,6 @@ type s3bucketStats struct { HeadOps uint64 DelOps uint64 ListOps uint64 - - opsCounters *prometheus.CounterVec - errCounters *prometheus.CounterVec - ioBytes *prometheus.CounterVec } func (s *s3bucketStats) TickErr(err error) { @@ -1046,32 +1032,5 @@ func (s *s3bucketStats) TickErr(err error) { if err, ok := err.(*s3.Error); ok { errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code) } - if s.errCounters != nil { - s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc() - } s.statsTicker.TickErr(err, errType) } - -func (s *s3bucketStats) TickOps(operations ...string) { - if s.opsCounters == nil { - return - } - for _, opType := range operations { - s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc() - } -} - -func (s *s3bucketStats) CountBytesIn(b uint64) { - s.countBytes("in", float64(b)) -} - -func (s *s3bucketStats) CountBytesOut(b uint64) { - s.countBytes("out", float64(b)) -} - -func (s *s3bucketStats) countBytes(direction string, bytes float64) { - if s.ioBytes == nil { - return - } - s.ioBytes.With(prometheus.Labels{"direction": direction}).Add(bytes) -} diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go index 377a536757..342b9e3205 100644 --- a/services/keepstore/stats_ticker.go +++ b/services/keepstore/stats_ticker.go @@ -7,6 +7,8 @@ package main import ( "sync" "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" ) type statsTicker struct { @@ -16,6 +18,10 @@ type statsTicker struct { ErrorCodes map[string]uint64 `json:",omitempty"` lock sync.Mutex + + opsCounters *prometheus.CounterVec + errCounters *prometheus.CounterVec + ioBytes *prometheus.CounterVec } // Tick increments each of the given counters by 1 using @@ -41,14 +47,33 @@ func (s *statsTicker) TickErr(err error, errType string) { } s.ErrorCodes[errType]++ s.lock.Unlock() + if s.errCounters != nil { + s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc() + } } // TickInBytes increments the incoming byte counter by n. func (s *statsTicker) TickInBytes(n uint64) { + if s.ioBytes != nil { + s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n)) + } atomic.AddUint64(&s.InBytes, n) } // TickOutBytes increments the outgoing byte counter by n. func (s *statsTicker) TickOutBytes(n uint64) { + if s.ioBytes != nil { + s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n)) + } atomic.AddUint64(&s.OutBytes, n) } + +// TickOps increments the counter of the listed operations by 1. +func (s *statsTicker) TickOps(operations ...string) { + if s.opsCounters == nil { + return + } + for _, opType := range operations { + s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc() + } +} diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index db8ad68a7a..8bdbf93629 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -120,11 +120,6 @@ type UnixVolume struct { locker sync.Locker os osWithStats - - // Volume metrics - opsCounters *prometheus.CounterVec - errCounters *prometheus.CounterVec - ioBytes *prometheus.CounterVec } // DeviceID returns a globally unique ID for the volume's root @@ -237,10 +232,9 @@ func (v *UnixVolume) Start(opsCounters, errCounters, ioBytes *prometheus.Counter } // Set up prometheus metrics lbls := prometheus.Labels{"device_id": v.DeviceID()} - v.opsCounters = opsCounters.MustCurryWith(lbls) - v.errCounters = errCounters.MustCurryWith(lbls) - v.ioBytes = ioBytes.MustCurryWith(lbls) - v.os.promSetup(v.opsCounters, v.errCounters, v.ioBytes) + v.os.stats.opsCounters = opsCounters.MustCurryWith(lbls) + v.os.stats.errCounters = errCounters.MustCurryWith(lbls) + v.os.stats.ioBytes = ioBytes.MustCurryWith(lbls) _, err := v.os.Stat(v.Root) @@ -267,12 +261,9 @@ func (v *UnixVolume) Touch(loc string) error { } defer v.unlockfile(f) ts := syscall.NsecToTimespec(time.Now().UnixNano()) - if v.os.opsCounters != nil { - v.os.opsCounters.With(prometheus.Labels{"operation": "utimes"}).Inc() - } + v.os.stats.TickOps("utimes") v.os.stats.Tick(&v.os.stats.UtimesOps) err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts}) - v.os.tickErr(err) v.os.stats.TickErr(err) return err } @@ -299,14 +290,7 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader return err } defer f.Close() - return fn(NewCountingReader( - ioutil.NopCloser(f), - func(c uint64) { - v.os.stats.TickInBytes(c) - if v.ioBytes != nil { - v.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(c)) - } - })) + return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) } // stat is os.Stat() with some extra sanity checks. @@ -393,9 +377,6 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) } defer v.unlock() n, err := io.Copy(tmpfile, rdr) - if v.ioBytes != nil { - v.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n)) - } v.os.stats.TickOutBytes(uint64(n)) if err != nil { log.Printf("%s: writing to %s: %s\n", v, bpath, err) @@ -468,9 +449,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { return err } defer rootdir.Close() - if v.opsCounters != nil { - v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc() - } + v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) for { names, err := rootdir.Readdirnames(1) @@ -493,9 +472,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { lastErr = err continue } - if v.opsCounters != nil { - v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc() - } + v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) for { fileInfo, err := blockdir.Readdir(1) @@ -584,9 +561,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) { return MethodDisabledError } - if v.opsCounters != nil { - v.opsCounters.With(prometheus.Labels{"operation": "readdir"}).Inc() - } + v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) files, err := ioutil.ReadDir(v.blockDir(loc)) if err != nil { @@ -733,19 +708,15 @@ func (v *UnixVolume) unlock() { // lockfile and unlockfile use flock(2) to manage kernel file locks. func (v *UnixVolume) lockfile(f *os.File) error { - if v.opsCounters != nil { - v.opsCounters.With(prometheus.Labels{"operation": "flock"}).Inc() - } + v.os.stats.TickOps("flock") v.os.stats.Tick(&v.os.stats.FlockOps) err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX) - v.os.tickErr(err) v.os.stats.TickErr(err) return err } func (v *UnixVolume) unlockfile(f *os.File) error { err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN) - v.os.tickErr(err) v.os.stats.TickErr(err) return err } @@ -852,87 +823,53 @@ func (s *unixStats) TickErr(err error) { } type osWithStats struct { - stats unixStats - opsCounters *prometheus.CounterVec - errCounters *prometheus.CounterVec - ioBytes *prometheus.CounterVec -} - -func (o *osWithStats) tickErr(err error) { - if err == nil || o.errCounters == nil { - return - } - o.errCounters.With(prometheus.Labels{"error_type": fmt.Sprintf("%T", err)}).Inc() -} - -func (o *osWithStats) promSetup(opsC, errC, ioB *prometheus.CounterVec) { - o.opsCounters = opsC - o.errCounters = errC - o.ioBytes = ioB + stats unixStats } func (o *osWithStats) Open(name string) (*os.File, error) { - if o.opsCounters != nil { - o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc() - } + o.stats.TickOps("open") o.stats.Tick(&o.stats.OpenOps) f, err := os.Open(name) - o.tickErr(err) o.stats.TickErr(err) return f, err } func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) { - if o.opsCounters != nil { - o.opsCounters.With(prometheus.Labels{"operation": "open"}).Inc() - } + o.stats.TickOps("open") o.stats.Tick(&o.stats.OpenOps) f, err := os.OpenFile(name, flag, perm) - o.tickErr(err) o.stats.TickErr(err) return f, err } func (o *osWithStats) Remove(path string) error { - if o.opsCounters != nil { - o.opsCounters.With(prometheus.Labels{"operation": "unlink"}).Inc() - } + o.stats.TickOps("unlink") o.stats.Tick(&o.stats.UnlinkOps) err := os.Remove(path) - o.tickErr(err) o.stats.TickErr(err) return err } func (o *osWithStats) Rename(a, b string) error { - if o.opsCounters != nil { - o.opsCounters.With(prometheus.Labels{"operation": "rename"}).Inc() - } + o.stats.TickOps("rename") o.stats.Tick(&o.stats.RenameOps) err := os.Rename(a, b) - o.tickErr(err) o.stats.TickErr(err) return err } func (o *osWithStats) Stat(path string) (os.FileInfo, error) { - if o.opsCounters != nil { - o.opsCounters.With(prometheus.Labels{"operation": "stat"}).Inc() - } + o.stats.TickOps("stat") o.stats.Tick(&o.stats.StatOps) fi, err := os.Stat(path) - o.tickErr(err) o.stats.TickErr(err) return fi, err } func (o *osWithStats) TempFile(dir, base string) (*os.File, error) { - if o.opsCounters != nil { - o.opsCounters.With(prometheus.Labels{"operation": "create"}).Inc() - } + o.stats.TickOps("create") o.stats.Tick(&o.stats.CreateOps) f, err := ioutil.TempFile(dir, base) - o.tickErr(err) o.stats.TickErr(err) return f, err } -- 2.30.2