Merge branch '19205-monitor-handler-threads'
[arvados.git] / sdk / go / httpserver / inspect.go
diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go
new file mode 100644 (file)
index 0000000..cb08acf
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "encoding/json"
+       "net/http"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+       type ent struct {
+               startTime  time.Time
+               hangupTime atomic.Value
+       }
+       current := map[*http.Request]*ent{}
+       mtx := sync.Mutex{}
+       if registry != nil {
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_active_request_age_seconds",
+                               Help:      "Age of oldest active request",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if _, ok := e.hangupTime.Load().(time.Time); ok {
+                                               // Don't count abandoned requests here
+                                               continue
+                                       }
+                                       if !any || e.startTime.Before(earliest) {
+                                               any = true
+                                               earliest = e.startTime
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_abandoned_request_age_seconds",
+                               Help:      "Maximum time since client hung up on a request whose processing thread is still running",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+                                               if !any || hangupTime.Before(earliest) {
+                                                       any = true
+                                                       earliest = hangupTime
+                                               }
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+       }
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+                       if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+                               Error(w, "unauthorized", http.StatusUnauthorized)
+                               return
+                       }
+                       mtx.Lock()
+                       defer mtx.Unlock()
+                       type outrec struct {
+                               RequestID  string
+                               Method     string
+                               Host       string
+                               URL        string
+                               RemoteAddr string
+                               Elapsed    float64
+                       }
+                       now := time.Now()
+                       outrecs := []outrec{}
+                       for req, e := range current {
+                               outrecs = append(outrecs, outrec{
+                                       RequestID:  req.Header.Get(HeaderRequestID),
+                                       Method:     req.Method,
+                                       Host:       req.Host,
+                                       URL:        req.URL.String(),
+                                       RemoteAddr: req.RemoteAddr,
+                                       Elapsed:    now.Sub(e.startTime).Seconds(),
+                               })
+                       }
+                       sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+                       w.Header().Set("Content-Type", "application/json")
+                       json.NewEncoder(w).Encode(outrecs)
+               } else {
+                       e := ent{startTime: time.Now()}
+                       mtx.Lock()
+                       current[req] = &e
+                       mtx.Unlock()
+                       go func() {
+                               <-req.Context().Done()
+                               e.hangupTime.Store(time.Now())
+                       }()
+                       defer func() {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               delete(current, req)
+                       }()
+                       next.ServeHTTP(w, req)
+               }
+       })
+}