11901: Add /_health/ping and /_health/db health checks.
[arvados.git] / services / ws / router.go
1 package main
2
3 import (
4         "encoding/json"
5         "io"
6         "net/http"
7         "strconv"
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
13         "github.com/Sirupsen/logrus"
14         "golang.org/x/net/websocket"
15 )
16
17 type wsConn interface {
18         io.ReadWriter
19         Request() *http.Request
20         SetReadDeadline(time.Time) error
21         SetWriteDeadline(time.Time) error
22 }
23
24 type router struct {
25         Config         *wsConfig
26         eventSource    eventSource
27         newPermChecker func() permChecker
28
29         handler   *handler
30         mux       *http.ServeMux
31         setupOnce sync.Once
32
33         lastReqID  int64
34         lastReqMtx sync.Mutex
35
36         status routerDebugStatus
37 }
38
39 type routerDebugStatus struct {
40         ReqsReceived int64
41         ReqsActive   int64
42 }
43
44 type debugStatuser interface {
45         DebugStatus() interface{}
46 }
47
48 func (rtr *router) setup() {
49         rtr.handler = &handler{
50                 PingTimeout: rtr.Config.PingTimeout.Duration(),
51                 QueueSize:   rtr.Config.ClientEventQueue,
52         }
53         rtr.mux = http.NewServeMux()
54         rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
55         rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
56         rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
57         rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
58         rtr.mux.HandleFunc("/_health/ping", jsonHandler(rtr.HealthFunc(func() error { return nil })))
59         rtr.mux.HandleFunc("/_health/db", jsonHandler(rtr.HealthFunc(rtr.eventSource.DBHealth)))
60 }
61
62 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
63         return &websocket.Server{
64                 Handshake: func(c *websocket.Config, r *http.Request) error {
65                         return nil
66                 },
67                 Handler: websocket.Handler(func(ws *websocket.Conn) {
68                         t0 := time.Now()
69                         log := logger(ws.Request().Context())
70                         log.Info("connected")
71
72                         stats := rtr.handler.Handle(ws, rtr.eventSource,
73                                 func(ws wsConn, sendq chan<- interface{}) (session, error) {
74                                         return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
75                                 })
76
77                         log.WithFields(logrus.Fields{
78                                 "elapsed": time.Now().Sub(t0).Seconds(),
79                                 "stats":   stats,
80                         }).Info("disconnect")
81                         ws.Close()
82                 }),
83         }
84 }
85
86 func (rtr *router) newReqID() string {
87         rtr.lastReqMtx.Lock()
88         defer rtr.lastReqMtx.Unlock()
89         id := time.Now().UnixNano()
90         if id <= rtr.lastReqID {
91                 id = rtr.lastReqID + 1
92         }
93         return strconv.FormatInt(id, 36)
94 }
95
96 func (rtr *router) DebugStatus() interface{} {
97         s := map[string]interface{}{
98                 "HTTP":     rtr.status,
99                 "Outgoing": rtr.handler.DebugStatus(),
100         }
101         if es, ok := rtr.eventSource.(debugStatuser); ok {
102                 s["EventSource"] = es.DebugStatus()
103         }
104         return s
105 }
106
107 var pingResponseOK = map[string]string{"health": "OK"}
108
109 func (rtr *router) HealthFunc(f func() error) func() interface{} {
110         return func() interface{} {
111                 err := f()
112                 if err == nil {
113                         return pingResponseOK
114                 }
115                 return map[string]string{
116                         "health": "ERROR",
117                         "error":  err.Error(),
118                 }
119         }
120 }
121
122 func (rtr *router) Status() interface{} {
123         return map[string]interface{}{
124                 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
125         }
126 }
127
128 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
129         rtr.setupOnce.Do(rtr.setup)
130         atomic.AddInt64(&rtr.status.ReqsReceived, 1)
131         atomic.AddInt64(&rtr.status.ReqsActive, 1)
132         defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
133
134         logger := logger(req.Context()).
135                 WithField("RequestID", rtr.newReqID())
136         ctx := ctxlog.Context(req.Context(), logger)
137         req = req.WithContext(ctx)
138         logger.WithFields(logrus.Fields{
139                 "remoteAddr":      req.RemoteAddr,
140                 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
141         }).Info("accept request")
142         rtr.mux.ServeHTTP(resp, req)
143 }
144
145 func jsonHandler(fn func() interface{}) http.HandlerFunc {
146         return func(resp http.ResponseWriter, req *http.Request) {
147                 logger := logger(req.Context())
148                 resp.Header().Set("Content-Type", "application/json")
149                 enc := json.NewEncoder(resp)
150                 err := enc.Encode(fn())
151                 if err != nil {
152                         msg := "encode failed"
153                         logger.WithError(err).Error(msg)
154                         http.Error(resp, msg, http.StatusInternalServerError)
155                 }
156         }
157 }