8460: Add simple /status.json
[arvados.git] / services / ws / router.go
1 package main
2
3 import (
4         "database/sql"
5         "encoding/json"
6         "io"
7         "net/http"
8         "strconv"
9         "sync"
10         "sync/atomic"
11         "time"
12
13         "github.com/Sirupsen/logrus"
14         "golang.org/x/net/websocket"
15 )
16
17 type wsConn interface {
18         io.ReadWriter
19         Request() *http.Request
20         SetReadDeadline(time.Time) error
21         SetWriteDeadline(time.Time) error
22 }
23
24 type router struct {
25         Config         *Config
26         eventSource    eventSource
27         newPermChecker func() permChecker
28
29         handler   *handler
30         mux       *http.ServeMux
31         setupOnce sync.Once
32
33         lastReqID  int64
34         lastReqMtx sync.Mutex
35
36         status routerDebugStatus
37 }
38
39 type routerDebugStatus struct {
40         ReqsReceived int64
41         ReqsActive   int64
42 }
43
44 type DebugStatuser interface {
45         DebugStatus() interface{}
46 }
47
48 type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
49
50 func (rtr *router) setup() {
51         rtr.handler = &handler{
52                 PingTimeout: rtr.Config.PingTimeout.Duration(),
53                 QueueSize:   rtr.Config.ClientEventQueue,
54         }
55         rtr.mux = http.NewServeMux()
56         rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
57         rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
58         rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
59         rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
60 }
61
62 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
63         return &websocket.Server{
64                 Handshake: func(c *websocket.Config, r *http.Request) error {
65                         return nil
66                 },
67                 Handler: websocket.Handler(func(ws *websocket.Conn) {
68                         t0 := time.Now()
69                         log := logger(ws.Request().Context())
70                         log.Info("connected")
71
72                         stats := rtr.handler.Handle(ws, rtr.eventSource,
73                                 func(ws wsConn, sendq chan<- interface{}) (session, error) {
74                                         return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
75                                 })
76
77                         log.WithFields(logrus.Fields{
78                                 "Elapsed": time.Now().Sub(t0).Seconds(),
79                                 "Stats":   stats,
80                         }).Info("disconnect")
81                         ws.Close()
82                 }),
83         }
84 }
85
86 func (rtr *router) newReqID() string {
87         rtr.lastReqMtx.Lock()
88         defer rtr.lastReqMtx.Unlock()
89         id := time.Now().UnixNano()
90         if id <= rtr.lastReqID {
91                 id = rtr.lastReqID + 1
92         }
93         return strconv.FormatInt(id, 36)
94 }
95
96 func (rtr *router) DebugStatus() interface{} {
97         s := map[string]interface{}{
98                 "HTTP":     rtr.status,
99                 "Outgoing": rtr.handler.DebugStatus(),
100         }
101         if es, ok := rtr.eventSource.(DebugStatuser); ok {
102                 s["EventSource"] = es.DebugStatus()
103         }
104         return s
105 }
106
107 func (rtr *router) Status() interface{} {
108         return map[string]interface{}{
109                 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
110         }
111 }
112
113 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
114         rtr.setupOnce.Do(rtr.setup)
115         atomic.AddInt64(&rtr.status.ReqsReceived, 1)
116         atomic.AddInt64(&rtr.status.ReqsActive, 1)
117         defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
118
119         logger := logger(req.Context()).
120                 WithField("RequestID", rtr.newReqID())
121         ctx := contextWithLogger(req.Context(), logger)
122         req = req.WithContext(ctx)
123         logger.WithFields(logrus.Fields{
124                 "RemoteAddr":      req.RemoteAddr,
125                 "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
126         }).Info("accept request")
127         rtr.mux.ServeHTTP(resp, req)
128 }
129
130 func jsonHandler(fn func() interface{}) http.HandlerFunc {
131         return func(resp http.ResponseWriter, req *http.Request) {
132                 logger := logger(req.Context())
133                 enc := json.NewEncoder(resp)
134                 err := enc.Encode(fn())
135                 if err != nil {
136                         msg := "encode failed"
137                         logger.WithError(err).Error(msg)
138                         http.Error(resp, msg, http.StatusInternalServerError)
139                 }
140         }
141 }