--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+ "encoding/json"
+ "net/http"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+ type ent struct {
+ startTime time.Time
+ hangupTime atomic.Value
+ }
+ current := map[*http.Request]*ent{}
+ mtx := sync.Mutex{}
+ if registry != nil {
+ registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_active_request_age_seconds",
+ Help: "Age of oldest active request",
+ },
+ func() float64 {
+ mtx.Lock()
+ defer mtx.Unlock()
+ earliest := time.Time{}
+ any := false
+ for _, e := range current {
+ if _, ok := e.hangupTime.Load().(time.Time); ok {
+ // Don't count abandoned requests here
+ continue
+ }
+ if !any || e.startTime.Before(earliest) {
+ any = true
+ earliest = e.startTime
+ }
+ }
+ if !any {
+ return 0
+ }
+ return float64(time.Since(earliest).Seconds())
+ },
+ ))
+ registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_abandoned_request_age_seconds",
+ Help: "Maximum time since client hung up on a request whose processing thread is still running",
+ },
+ func() float64 {
+ mtx.Lock()
+ defer mtx.Unlock()
+ earliest := time.Time{}
+ any := false
+ for _, e := range current {
+ if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+ if !any || hangupTime.Before(earliest) {
+ any = true
+ earliest = hangupTime
+ }
+ }
+ }
+ if !any {
+ return 0
+ }
+ return float64(time.Since(earliest).Seconds())
+ },
+ ))
+ }
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+ if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+ Error(w, "unauthorized", http.StatusUnauthorized)
+ return
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ type outrec struct {
+ RequestID string
+ Method string
+ Host string
+ URL string
+ RemoteAddr string
+ Elapsed float64
+ }
+ now := time.Now()
+ outrecs := []outrec{}
+ for req, e := range current {
+ outrecs = append(outrecs, outrec{
+ RequestID: req.Header.Get(HeaderRequestID),
+ Method: req.Method,
+ Host: req.Host,
+ URL: req.URL.String(),
+ RemoteAddr: req.RemoteAddr,
+ Elapsed: now.Sub(e.startTime).Seconds(),
+ })
+ }
+ sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(outrecs)
+ } else {
+ e := ent{startTime: time.Now()}
+ mtx.Lock()
+ current[req] = &e
+ mtx.Unlock()
+ go func() {
+ <-req.Context().Done()
+ e.hangupTime.Store(time.Now())
+ }()
+ defer func() {
+ mtx.Lock()
+ defer mtx.Unlock()
+ delete(current, req)
+ }()
+ next.ServeHTTP(w, req)
+ }
+ })
+}