From 4bec5aa50dc40924741221259bfcbb53056cb35c Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 23 Jun 2022 23:57:34 -0400 Subject: [PATCH] 19205: Report longest-running active and abandoned reqs in metrics. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/service/cmd.go | 7 +- sdk/go/httpserver/inspect.go | 133 ++++++++++++++++++++++ sdk/go/httpserver/inspect_test.go | 89 +++++++++++++++ sdk/go/httpserver/request_limiter_test.go | 6 +- 4 files changed, 229 insertions(+), 6 deletions(-) create mode 100644 sdk/go/httpserver/inspect.go create mode 100644 sdk/go/httpserver/inspect_test.go diff --git a/lib/service/cmd.go b/lib/service/cmd.go index 679cbede13..e08af9f614 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout instrumented := httpserver.Instrument(reg, log, httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(), httpserver.AddRequestIDs( - httpserver.LogRequests( - interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth, - httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))) + httpserver.Inspect(reg, cluster.ManagementToken, + httpserver.LogRequests( + interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth, + httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))) srv := &httpserver.Server{ Server: http.Server{ Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)), diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go new file mode 100644 index 0000000000..cb08acf962 --- /dev/null +++ b/sdk/go/httpserver/inspect.go @@ -0,0 +1,133 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package httpserver + +import ( + "encoding/json" + "net/http" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// Inspect serves a report of current requests at "GET +// /_inspect/requests", and passes other requests through to the next +// handler. +// +// If registry is not nil, Inspect registers metrics about current +// requests. +func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler { + type ent struct { + startTime time.Time + hangupTime atomic.Value + } + current := map[*http.Request]*ent{} + mtx := sync.Mutex{} + if registry != nil { + registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_active_request_age_seconds", + Help: "Age of oldest active request", + }, + func() float64 { + mtx.Lock() + defer mtx.Unlock() + earliest := time.Time{} + any := false + for _, e := range current { + if _, ok := e.hangupTime.Load().(time.Time); ok { + // Don't count abandoned requests here + continue + } + if !any || e.startTime.Before(earliest) { + any = true + earliest = e.startTime + } + } + if !any { + return 0 + } + return float64(time.Since(earliest).Seconds()) + }, + )) + registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_abandoned_request_age_seconds", + Help: "Maximum time since client hung up on a request whose processing thread is still running", + }, + func() float64 { + mtx.Lock() + defer mtx.Unlock() + earliest := time.Time{} + any := false + for _, e := range current { + if hangupTime, ok := e.hangupTime.Load().(time.Time); ok { + if !any || hangupTime.Before(earliest) { + any = true + earliest = hangupTime + } + } + } + if !any { + return 0 + } + return float64(time.Since(earliest).Seconds()) + }, + )) + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.Method == "GET" && req.URL.Path == "/_inspect/requests" { + if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken { + Error(w, "unauthorized", http.StatusUnauthorized) + return + } + mtx.Lock() + defer mtx.Unlock() + type outrec struct { + RequestID string + Method string + Host string + URL string + RemoteAddr string + Elapsed float64 + } + now := time.Now() + outrecs := []outrec{} + for req, e := range current { + outrecs = append(outrecs, outrec{ + RequestID: req.Header.Get(HeaderRequestID), + Method: req.Method, + Host: req.Host, + URL: req.URL.String(), + RemoteAddr: req.RemoteAddr, + Elapsed: now.Sub(e.startTime).Seconds(), + }) + } + sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed }) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(outrecs) + } else { + e := ent{startTime: time.Now()} + mtx.Lock() + current[req] = &e + mtx.Unlock() + go func() { + <-req.Context().Done() + e.hangupTime.Store(time.Now()) + }() + defer func() { + mtx.Lock() + defer mtx.Unlock() + delete(current, req) + }() + next.ServeHTTP(w, req) + } + }) +} diff --git a/sdk/go/httpserver/inspect_test.go b/sdk/go/httpserver/inspect_test.go new file mode 100644 index 0000000000..cab8a434dd --- /dev/null +++ b/sdk/go/httpserver/inspect_test.go @@ -0,0 +1,89 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package httpserver + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + check "gopkg.in/check.v1" +) + +func (s *Suite) TestInspect(c *check.C) { + reg := prometheus.NewRegistry() + h := newTestHandler() + mh := Inspect(reg, "abcd", h) + handlerReturned := make(chan struct{}) + reqctx, reqcancel := context.WithCancel(context.Background()) + longreq := httptest.NewRequest("GET", "/test", nil).WithContext(reqctx) + go func() { + mh.ServeHTTP(httptest.NewRecorder(), longreq) + close(handlerReturned) + }() + <-h.inHandler + + resp := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/_inspect/requests", nil) + mh.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusUnauthorized) + c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n") + + resp = httptest.NewRecorder() + req.Header.Set("Authorization", "Bearer abcde") + mh.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusUnauthorized) + + resp = httptest.NewRecorder() + req.Header.Set("Authorization", "Bearer abcd") + mh.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) + reqs := []map[string]interface{}{} + err := json.NewDecoder(resp.Body).Decode(&reqs) + c.Check(err, check.IsNil) + c.Check(reqs, check.HasLen, 1) + c.Check(reqs[0]["URL"], check.Equals, "/test") + + // Request is active, so we should see active request age > 0 + resp = httptest.NewRecorder() + mreq := httptest.NewRequest("GET", "/metrics", nil) + promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`) + + reqcancel() + + // Request context is canceled but handler hasn't returned, so + // we should see max abandoned request age > 0 + resp = httptest.NewRecorder() + promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`) + + h.okToProceed <- struct{}{} + <-handlerReturned + + // Handler has returned, so we should see max abandoned + // request age == max active request age == 0 + resp = httptest.NewRecorder() + promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`) + + // ...and no active requests at the /_monitor endpoint + resp = httptest.NewRecorder() + mh.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) + reqs = nil + err = json.NewDecoder(resp.Body).Decode(&reqs) + c.Check(err, check.IsNil) + c.Assert(reqs, check.HasLen, 0) +} diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go index 64d1f3d4cf..9258fbfa58 100644 --- a/sdk/go/httpserver/request_limiter_test.go +++ b/sdk/go/httpserver/request_limiter_test.go @@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { <-h.okToProceed } -func newTestHandler(maxReqs int) *testHandler { +func newTestHandler() *testHandler { return &testHandler{ inHandler: make(chan struct{}), okToProceed: make(chan struct{}), @@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler { } func TestRequestLimiter1(t *testing.T) { - h := newTestHandler(10) + h := newTestHandler() l := NewRequestLimiter(1, h, nil) var wg sync.WaitGroup resps := make([]*httptest.ResponseRecorder, 10) @@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) { } func TestRequestLimiter10(t *testing.T) { - h := newTestHandler(10) + h := newTestHandler() l := NewRequestLimiter(10, h, nil) var wg sync.WaitGroup for i := 0; i < 10; i++ { -- 2.30.2