15317: Add transfer size bucketed latency/speed metrics.
authorTom Clegg <tom@curii.com>
Thu, 18 Jan 2024 15:51:41 +0000 (10:51 -0500)
committerTom Clegg <tom@curii.com>
Thu, 18 Jan 2024 15:51:41 +0000 (10:51 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/metrics.go [new file with mode: 0644]
services/keep-web/server_test.go

index 12c2839f8ca78fab56e8111efc2d3111e4bd02b0..e0da14e774525d9b860e6c92c62a010653e25d06 100644 (file)
@@ -34,6 +34,7 @@ import (
 type handler struct {
        Cache   cache
        Cluster *arvados.Cluster
+       metrics *metrics
 
        lockMtx    sync.Mutex
        lock       map[string]*sync.RWMutex
@@ -592,7 +593,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        if webdavPrefix == "" {
                webdavPrefix = "/" + strings.Join(pathParts[:stripParts], "/")
        }
-       wh := webdav.Handler{
+       wh := &webdav.Handler{
                Prefix: webdavPrefix,
                FileSystem: &webdavfs.FS{
                        FileSystem:    sessionFS,
@@ -607,7 +608,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        }
                },
        }
-       wh.ServeHTTP(w, r)
+       h.metrics.track(wh, w, r)
        if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
                wrote := int64(w.WroteBodyBytes())
                fnm := strings.Join(pathParts[stripParts:], "/")
index eefab36e69bac17ce7ca975836e444a7de9a141b..07c7016d3a8e485e8b1267d73fd0b547b14662bc 100644 (file)
@@ -60,6 +60,7 @@ func (s *UnitSuite) SetUpTest(c *check.C) {
                        logger:   logger,
                        registry: prometheus.NewRegistry(),
                },
+               metrics: newMetrics(prometheus.NewRegistry()),
        }
 }
 
index cd379dc6bd667df887b410edce26abd6eff209e7..690e75a2514b15bb0c644f8d085c2a57b068f1fd 100644 (file)
@@ -41,5 +41,6 @@ func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg
                        logger:   logger,
                        registry: reg,
                },
+               metrics: newMetrics(reg),
        }, nil
 }
diff --git a/services/keep-web/metrics.go b/services/keep-web/metrics.go
new file mode 100644 (file)
index 0000000..fe27ba5
--- /dev/null
@@ -0,0 +1,154 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepweb
+
+import (
+       "io"
+       "math"
+       "net/http"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type metrics struct {
+       mDownloadSpeed        *prometheus.HistogramVec
+       mDownloadBackendSpeed *prometheus.HistogramVec
+       mUploadSpeed          *prometheus.HistogramVec
+       mUploadSyncDelay      *prometheus.HistogramVec
+}
+
+func newMetrics(reg *prometheus.Registry) *metrics {
+       m := &metrics{
+               mDownloadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepweb",
+                       Name:      "download_speed",
+                       Help:      "Download speed (bytes per second) bucketed by transfer size range",
+                       Buckets:   []float64{10_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000, math.Inf(+1)},
+               }, []string{"size_range"}),
+               mDownloadBackendSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepweb",
+                       Name:      "download_limiting_backend_speed",
+                       Help:      "Limiting backend speed (bytes per second) when serving file downloads, bucketed by transfer size range",
+                       Buckets:   []float64{10_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000, math.Inf(+1)},
+               }, []string{"size_range"}),
+               mUploadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepweb",
+                       Name:      "upload_speed",
+                       Help:      "Upload speed (bytes per second) bucketed by transfer size range",
+                       Buckets:   []float64{10_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000, math.Inf(+1)},
+               }, []string{"size_range"}),
+               mUploadSyncDelay: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepweb",
+                       Name:      "upload_sync_delay_seconds",
+                       Help:      "Upload sync delay (time from last byte received to HTTP response)",
+               }, []string{"size_range"}),
+       }
+       reg.MustRegister(m.mDownloadSpeed)
+       reg.MustRegister(m.mDownloadBackendSpeed)
+       reg.MustRegister(m.mUploadSpeed)
+       reg.MustRegister(m.mUploadSyncDelay)
+       return m
+}
+
+// run handler(w,r) and record upload/download metrics as applicable.
+func (m *metrics) track(handler http.Handler, w http.ResponseWriter, r *http.Request) {
+       switch r.Method {
+       case http.MethodGet:
+               dt := newDownloadTracker(w)
+               handler.ServeHTTP(dt, r)
+               size := dt.bytesOut
+               if size == 0 {
+                       return
+               }
+               bucket := sizeRange(size)
+               m.mDownloadSpeed.WithLabelValues(bucket).Observe(float64(dt.bytesOut) / time.Since(dt.t0).Seconds())
+               m.mDownloadBackendSpeed.WithLabelValues(bucket).Observe(float64(size) / (dt.backendWait + time.Since(dt.lastByte)).Seconds())
+       case http.MethodPut:
+               ut := newUploadTracker(r)
+               handler.ServeHTTP(w, r)
+               d := ut.lastByte.Sub(ut.t0)
+               if d <= 0 {
+                       // Read() was not called, or did not return
+                       // any data
+                       return
+               }
+               size := ut.bytesIn
+               bucket := sizeRange(size)
+               m.mUploadSpeed.WithLabelValues(bucket).Observe(float64(ut.bytesIn) / d.Seconds())
+               m.mUploadSyncDelay.WithLabelValues(bucket).Observe(time.Since(ut.lastByte).Seconds())
+       default:
+               handler.ServeHTTP(w, r)
+       }
+}
+
+// Assign a sizeRange based on number of bytes transferred (not the
+// same as file size in the case of a Range request or interrupted
+// transfer).
+func sizeRange(size int64) string {
+       switch {
+       case size <= 1_000_000:
+               return "0"
+       case size <= 10_000_000:
+               return "1M"
+       case size <= 100_000_000:
+               return "10M"
+       default:
+               return "100M"
+       }
+}
+
+type downloadTracker struct {
+       http.ResponseWriter
+       t0 time.Time
+
+       firstByte   time.Time     // time of first call to Write
+       lastByte    time.Time     // time of most recent call to Write
+       bytesOut    int64         // bytes sent to client so far
+       backendWait time.Duration // total of intervals between Write calls
+}
+
+func newDownloadTracker(w http.ResponseWriter) *downloadTracker {
+       return &downloadTracker{ResponseWriter: w, t0: time.Now()}
+}
+func (dt *downloadTracker) Write(p []byte) (int, error) {
+       if dt.lastByte.IsZero() {
+               dt.backendWait += time.Since(dt.t0)
+       } else {
+               dt.backendWait += time.Since(dt.lastByte)
+       }
+       if dt.firstByte.IsZero() {
+               dt.firstByte = time.Now()
+       }
+       n, err := dt.ResponseWriter.Write(p)
+       dt.bytesOut += int64(n)
+       dt.lastByte = time.Now()
+       return n, err
+}
+
+type uploadTracker struct {
+       io.ReadCloser
+       t0       time.Time
+       lastByte time.Time
+       bytesIn  int64
+}
+
+func newUploadTracker(r *http.Request) *uploadTracker {
+       now := time.Now()
+       ut := &uploadTracker{ReadCloser: r.Body, t0: now}
+       r.Body = ut
+       return ut
+}
+
+func (ut *uploadTracker) Read(p []byte) (int, error) {
+       n, err := ut.ReadCloser.Read(p)
+       ut.lastByte = time.Now()
+       ut.bytesIn += int64(n)
+       return n, err
+}
index dd29c40082cb7910da661801c610ba3c753a4271..a418e84d5a0d4bb40aef22badd8746ced4bb1e6c 100644 (file)
@@ -412,6 +412,24 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
                resp.Body.Close()
        }
 
+       var coll arvados.Collection
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, check.IsNil)
+       arv.ApiToken = arvadostest.ActiveTokenV2
+       err = arv.Create("collections", map[string]interface{}{"ensure_unique_name": true}, &coll)
+       c.Assert(err, check.IsNil)
+       defer arv.Delete("collections", coll.UUID, nil, nil)
+       for i := 0; i < 2; i++ {
+               size := 1 << (i * 12)
+               req, _ = http.NewRequest("PUT", srvaddr+"/zero-"+fmt.Sprintf("%d", size), bytes.NewReader(make([]byte, size)))
+               req.Host = coll.UUID + ".example.com"
+               req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+               resp, err = http.DefaultClient.Do(req)
+               c.Assert(err, check.IsNil)
+               c.Check(resp.StatusCode, check.Equals, http.StatusCreated)
+               resp.Body.Close()
+       }
+
        time.Sleep(metricsUpdateInterval * 2)
 
        req, _ = http.NewRequest("GET", srvaddr+"/metrics.json", nil)
@@ -476,7 +494,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
        c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
        c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
-       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(624))
+       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(1208))
 
        // If the Host header indicates a collection, /metrics.json
        // refers to a file in the collection -- the metrics handler
@@ -490,6 +508,24 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
                c.Assert(err, check.IsNil)
                c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
        }
+
+       // Dump entire metrics output in test logs
+       req, _ = http.NewRequest("GET", srvaddr+"/metrics", nil)
+       req.Host = cluster.Services.WebDAVDownload.ExternalURL.Host
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+       resp, err = http.DefaultClient.Do(req)
+       c.Assert(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       buf, err := ioutil.ReadAll(resp.Body)
+       c.Check(err, check.IsNil)
+
+       c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_download_limiting_backend_speed_bucket{size_range="0",le="1e\+06"} 4\n.*`)
+       c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_download_speed_bucket{size_range="0",le="\+Inf"} 4\n.*`)
+       c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_upload_speed_bucket{size_range="0",le="\+Inf"} 2\n.*`)
+       c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_upload_sync_delay_seconds_bucket{size_range="0",le="10"} 2\n.*`)
+
+       // Dump entire metrics output in test logs
+       c.Logf("%s", buf)
 }
 
 func (s *IntegrationSuite) SetUpSuite(c *check.C) {