X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ba86d71949972435ac3ac0ad54cb34a237970fda..469521bb7ea65a68f04a7595d3b6b10264026db6:/sdk/go/httpserver/inspect.go diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go new file mode 100644 index 0000000000..cb08acf962 --- /dev/null +++ b/sdk/go/httpserver/inspect.go @@ -0,0 +1,133 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package httpserver + +import ( + "encoding/json" + "net/http" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// Inspect serves a report of current requests at "GET +// /_inspect/requests", and passes other requests through to the next +// handler. +// +// If registry is not nil, Inspect registers metrics about current +// requests. +func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler { + type ent struct { + startTime time.Time + hangupTime atomic.Value + } + current := map[*http.Request]*ent{} + mtx := sync.Mutex{} + if registry != nil { + registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_active_request_age_seconds", + Help: "Age of oldest active request", + }, + func() float64 { + mtx.Lock() + defer mtx.Unlock() + earliest := time.Time{} + any := false + for _, e := range current { + if _, ok := e.hangupTime.Load().(time.Time); ok { + // Don't count abandoned requests here + continue + } + if !any || e.startTime.Before(earliest) { + any = true + earliest = e.startTime + } + } + if !any { + return 0 + } + return float64(time.Since(earliest).Seconds()) + }, + )) + registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_abandoned_request_age_seconds", + Help: "Maximum time since client hung up on a request whose processing thread is still running", + }, + func() float64 { + mtx.Lock() + defer mtx.Unlock() + earliest := time.Time{} + any := false + for _, e := range current { + if hangupTime, ok := e.hangupTime.Load().(time.Time); ok { + if !any || hangupTime.Before(earliest) { + any = true + earliest = hangupTime + } + } + } + if !any { + return 0 + } + return float64(time.Since(earliest).Seconds()) + }, + )) + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.Method == "GET" && req.URL.Path == "/_inspect/requests" { + if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken { + Error(w, "unauthorized", http.StatusUnauthorized) + return + } + mtx.Lock() + defer mtx.Unlock() + type outrec struct { + RequestID string + Method string + Host string + URL string + RemoteAddr string + Elapsed float64 + } + now := time.Now() + outrecs := []outrec{} + for req, e := range current { + outrecs = append(outrecs, outrec{ + RequestID: req.Header.Get(HeaderRequestID), + Method: req.Method, + Host: req.Host, + URL: req.URL.String(), + RemoteAddr: req.RemoteAddr, + Elapsed: now.Sub(e.startTime).Seconds(), + }) + } + sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed }) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(outrecs) + } else { + e := ent{startTime: time.Now()} + mtx.Lock() + current[req] = &e + mtx.Unlock() + go func() { + <-req.Context().Done() + e.hangupTime.Store(time.Now()) + }() + defer func() { + mtx.Lock() + defer mtx.Unlock() + delete(current, req) + }() + next.ServeHTTP(w, req) + } + }) +}