19205: Report longest-running active and abandoned reqs in metrics.
authorTom Clegg <tom@curii.com>
Fri, 24 Jun 2022 03:57:34 +0000 (23:57 -0400)
committerTom Clegg <tom@curii.com>
Fri, 24 Jun 2022 16:03:49 +0000 (12:03 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/service/cmd.go
sdk/go/httpserver/inspect.go [new file with mode: 0644]
sdk/go/httpserver/inspect_test.go [new file with mode: 0644]
sdk/go/httpserver/request_limiter_test.go

index 679cbede13bc8cf141a34ec40373a458c5195451..e08af9f6142301febbd8951da14f0bb3b3ff13d2 100644 (file)
@@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
-                               httpserver.LogRequests(
-                                       interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                               httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+                               httpserver.Inspect(reg, cluster.ManagementToken,
+                                       httpserver.LogRequests(
+                                               interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+                                                       httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go
new file mode 100644 (file)
index 0000000..cb08acf
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "encoding/json"
+       "net/http"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+       type ent struct {
+               startTime  time.Time
+               hangupTime atomic.Value
+       }
+       current := map[*http.Request]*ent{}
+       mtx := sync.Mutex{}
+       if registry != nil {
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_active_request_age_seconds",
+                               Help:      "Age of oldest active request",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if _, ok := e.hangupTime.Load().(time.Time); ok {
+                                               // Don't count abandoned requests here
+                                               continue
+                                       }
+                                       if !any || e.startTime.Before(earliest) {
+                                               any = true
+                                               earliest = e.startTime
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_abandoned_request_age_seconds",
+                               Help:      "Maximum time since client hung up on a request whose processing thread is still running",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+                                               if !any || hangupTime.Before(earliest) {
+                                                       any = true
+                                                       earliest = hangupTime
+                                               }
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+       }
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+                       if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+                               Error(w, "unauthorized", http.StatusUnauthorized)
+                               return
+                       }
+                       mtx.Lock()
+                       defer mtx.Unlock()
+                       type outrec struct {
+                               RequestID  string
+                               Method     string
+                               Host       string
+                               URL        string
+                               RemoteAddr string
+                               Elapsed    float64
+                       }
+                       now := time.Now()
+                       outrecs := []outrec{}
+                       for req, e := range current {
+                               outrecs = append(outrecs, outrec{
+                                       RequestID:  req.Header.Get(HeaderRequestID),
+                                       Method:     req.Method,
+                                       Host:       req.Host,
+                                       URL:        req.URL.String(),
+                                       RemoteAddr: req.RemoteAddr,
+                                       Elapsed:    now.Sub(e.startTime).Seconds(),
+                               })
+                       }
+                       sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+                       w.Header().Set("Content-Type", "application/json")
+                       json.NewEncoder(w).Encode(outrecs)
+               } else {
+                       e := ent{startTime: time.Now()}
+                       mtx.Lock()
+                       current[req] = &e
+                       mtx.Unlock()
+                       go func() {
+                               <-req.Context().Done()
+                               e.hangupTime.Store(time.Now())
+                       }()
+                       defer func() {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               delete(current, req)
+                       }()
+                       next.ServeHTTP(w, req)
+               }
+       })
+}
diff --git a/sdk/go/httpserver/inspect_test.go b/sdk/go/httpserver/inspect_test.go
new file mode 100644 (file)
index 0000000..cab8a43
--- /dev/null
@@ -0,0 +1,89 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       check "gopkg.in/check.v1"
+)
+
+func (s *Suite) TestInspect(c *check.C) {
+       reg := prometheus.NewRegistry()
+       h := newTestHandler()
+       mh := Inspect(reg, "abcd", h)
+       handlerReturned := make(chan struct{})
+       reqctx, reqcancel := context.WithCancel(context.Background())
+       longreq := httptest.NewRequest("GET", "/test", nil).WithContext(reqctx)
+       go func() {
+               mh.ServeHTTP(httptest.NewRecorder(), longreq)
+               close(handlerReturned)
+       }()
+       <-h.inHandler
+
+       resp := httptest.NewRecorder()
+       req := httptest.NewRequest("GET", "/_inspect/requests", nil)
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+       c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n")
+
+       resp = httptest.NewRecorder()
+       req.Header.Set("Authorization", "Bearer abcde")
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+       resp = httptest.NewRecorder()
+       req.Header.Set("Authorization", "Bearer abcd")
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       reqs := []map[string]interface{}{}
+       err := json.NewDecoder(resp.Body).Decode(&reqs)
+       c.Check(err, check.IsNil)
+       c.Check(reqs, check.HasLen, 1)
+       c.Check(reqs[0]["URL"], check.Equals, "/test")
+
+       // Request is active, so we should see active request age > 0
+       resp = httptest.NewRecorder()
+       mreq := httptest.NewRequest("GET", "/metrics", nil)
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+       reqcancel()
+
+       // Request context is canceled but handler hasn't returned, so
+       // we should see max abandoned request age > 0
+       resp = httptest.NewRecorder()
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+
+       h.okToProceed <- struct{}{}
+       <-handlerReturned
+
+       // Handler has returned, so we should see max abandoned
+       // request age == max active request age == 0
+       resp = httptest.NewRecorder()
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+       // ...and no active requests at the /_monitor endpoint
+       resp = httptest.NewRecorder()
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       reqs = nil
+       err = json.NewDecoder(resp.Body).Decode(&reqs)
+       c.Check(err, check.IsNil)
+       c.Assert(reqs, check.HasLen, 0)
+}
index 64d1f3d4cfb3fc47930ad1d655ae97366af6efb0..9258fbfa58f4b5a4867651fc15aba4e9b9616dcf 100644 (file)
@@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        <-h.okToProceed
 }
 
-func newTestHandler(maxReqs int) *testHandler {
+func newTestHandler() *testHandler {
        return &testHandler{
                inHandler:   make(chan struct{}),
                okToProceed: make(chan struct{}),
@@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler {
 }
 
 func TestRequestLimiter1(t *testing.T) {
-       h := newTestHandler(10)
+       h := newTestHandler()
        l := NewRequestLimiter(1, h, nil)
        var wg sync.WaitGroup
        resps := make([]*httptest.ResponseRecorder, 10)
@@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) {
 }
 
 func TestRequestLimiter10(t *testing.T) {
-       h := newTestHandler(10)
+       h := newTestHandler()
        l := NewRequestLimiter(10, h, nil)
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {