Merge branch '20601-big-ctr-queue'
[arvados.git] / sdk / go / httpserver / inspect.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "encoding/json"
9         "net/http"
10         "sort"
11         "sync"
12         "sync/atomic"
13         "time"
14
15         "github.com/prometheus/client_golang/prometheus"
16 )
17
18 // Inspect serves a report of current requests at "GET
19 // /_inspect/requests", and passes other requests through to the next
20 // handler.
21 //
22 // If registry is not nil, Inspect registers metrics about current
23 // requests.
24 func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
25         type ent struct {
26                 startTime  time.Time
27                 hangupTime atomic.Value
28         }
29         current := map[*http.Request]*ent{}
30         mtx := sync.Mutex{}
31         if registry != nil {
32                 registry.MustRegister(prometheus.NewGaugeFunc(
33                         prometheus.GaugeOpts{
34                                 Namespace: "arvados",
35                                 Name:      "max_active_request_age_seconds",
36                                 Help:      "Age of oldest active request",
37                         },
38                         func() float64 {
39                                 mtx.Lock()
40                                 defer mtx.Unlock()
41                                 earliest := time.Time{}
42                                 any := false
43                                 for _, e := range current {
44                                         if _, ok := e.hangupTime.Load().(time.Time); ok {
45                                                 // Don't count abandoned requests here
46                                                 continue
47                                         }
48                                         if !any || e.startTime.Before(earliest) {
49                                                 any = true
50                                                 earliest = e.startTime
51                                         }
52                                 }
53                                 if !any {
54                                         return 0
55                                 }
56                                 return float64(time.Since(earliest).Seconds())
57                         },
58                 ))
59                 registry.MustRegister(prometheus.NewGaugeFunc(
60                         prometheus.GaugeOpts{
61                                 Namespace: "arvados",
62                                 Name:      "max_abandoned_request_age_seconds",
63                                 Help:      "Maximum time since client hung up on a request whose processing thread is still running",
64                         },
65                         func() float64 {
66                                 mtx.Lock()
67                                 defer mtx.Unlock()
68                                 earliest := time.Time{}
69                                 any := false
70                                 for _, e := range current {
71                                         if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
72                                                 if !any || hangupTime.Before(earliest) {
73                                                         any = true
74                                                         earliest = hangupTime
75                                                 }
76                                         }
77                                 }
78                                 if !any {
79                                         return 0
80                                 }
81                                 return float64(time.Since(earliest).Seconds())
82                         },
83                 ))
84         }
85         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
86                 if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
87                         if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
88                                 Error(w, "unauthorized", http.StatusUnauthorized)
89                                 return
90                         }
91                         mtx.Lock()
92                         defer mtx.Unlock()
93                         type outrec struct {
94                                 RequestID  string
95                                 Method     string
96                                 Host       string
97                                 URL        string
98                                 RemoteAddr string
99                                 Elapsed    float64
100                         }
101                         now := time.Now()
102                         outrecs := []outrec{}
103                         for req, e := range current {
104                                 outrecs = append(outrecs, outrec{
105                                         RequestID:  req.Header.Get(HeaderRequestID),
106                                         Method:     req.Method,
107                                         Host:       req.Host,
108                                         URL:        req.URL.String(),
109                                         RemoteAddr: req.RemoteAddr,
110                                         Elapsed:    now.Sub(e.startTime).Seconds(),
111                                 })
112                         }
113                         sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
114                         w.Header().Set("Content-Type", "application/json")
115                         json.NewEncoder(w).Encode(outrecs)
116                 } else {
117                         e := ent{startTime: time.Now()}
118                         mtx.Lock()
119                         current[req] = &e
120                         mtx.Unlock()
121                         go func() {
122                                 <-req.Context().Done()
123                                 e.hangupTime.Store(time.Now())
124                         }()
125                         defer func() {
126                                 mtx.Lock()
127                                 defer mtx.Unlock()
128                                 delete(current, req)
129                         }()
130                         next.ServeHTTP(w, req)
131                 }
132         })
133 }